# # NEPI, a framework to manage network experiments # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Author: Alina Quereilhac # Lucia Guevgeozian from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import ResourceManager, clsinit_copy, \ ResourceState, reschedule_delay from nepi.resources.linux.node import LinuxNode from nepi.resources.planetlab.plcapi import PLCAPIFactory from nepi.util.execfuncs import lexec from nepi.util import sshfuncs from random import randint import time import threading import datetime @clsinit_copy class PlanetlabNode(LinuxNode): _rtype = "PlanetlabNode" _help = "Controls a PlanetLab host accessible using a SSH key " \ "associated to a PlanetLab user account" _backend = "planetlab" lock = threading.Lock() @classmethod def _register_attributes(cls): ip = Attribute("ip", "PlanetLab host public IP address", flags = Flags.ReadOnly) pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \ (e.g. www.planet-lab.eu or www.planet-lab.org) ", default = "www.planet-lab.eu", flags = Flags.Credential) pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \ (e.g. https://%(hostname)s:443/PLCAPI/ ) ", default = "https://%(hostname)s:443/PLCAPI/", flags = Flags.ExecReadOnly) pl_user = Attribute("pluser", "PlanetLab account user, as the one to \ authenticate in the website) ", flags = Flags.Credential) pl_password = Attribute("plpassword", "PlanetLab account password, as \ the one to authenticate in the website) ", flags = Flags.Credential) city = Attribute("city", "Constrain location (city) during resource \ discovery. May use wildcards.", flags = Flags.Filter) country = Attribute("country", "Constrain location (country) during \ resource discovery. May use wildcards.", flags = Flags.Filter) region = Attribute("region", "Constrain location (region) during \ resource discovery. May use wildcards.", flags = Flags.Filter) architecture = Attribute("architecture", "Constrain architecture \ during resource discovery.", type = Types.Enumerate, allowed = ["x86_64", "i386"], flags = Flags.Filter) operating_system = Attribute("operatingSystem", "Constrain operating \ system during resource discovery.", type = Types.Enumerate, allowed = ["f8", "f12", "f14", "centos", "other"], flags = Flags.Filter) #site = Attribute("site", "Constrain the PlanetLab site this node \ # should reside on.", # type = Types.Enumerate, # allowed = ["PLE", # "PLC", # "PLJ"], # flags = Flags.Filter) min_reliability = Attribute("minReliability", "Constrain reliability \ while picking PlanetLab nodes. Specifies a lower \ acceptable bound.", type = Types.Double, range = (1, 100), flags = Flags.Filter) max_reliability = Attribute("maxReliability", "Constrain reliability \ while picking PlanetLab nodes. Specifies an upper \ acceptable bound.", type = Types.Double, range = (1, 100), flags = Flags.Filter) min_bandwidth = Attribute("minBandwidth", "Constrain available \ bandwidth while picking PlanetLab nodes. \ Specifies a lower acceptable bound.", type = Types.Double, range = (0, 2**31), flags = Flags.Filter) max_bandwidth = Attribute("maxBandwidth", "Constrain available \ bandwidth while picking PlanetLab nodes. \ Specifies an upper acceptable bound.", type = Types.Double, range = (0, 2**31), flags = Flags.Filter) min_load = Attribute("minLoad", "Constrain node load average while \ picking PlanetLab nodes. Specifies a lower acceptable \ bound.", type = Types.Double, range = (0, 2**31), flags = Flags.Filter) max_load = Attribute("maxLoad", "Constrain node load average while \ picking PlanetLab nodes. Specifies an upper acceptable \ bound.", type = Types.Double, range = (0, 2**31), flags = Flags.Filter) min_cpu = Attribute("minCpu", "Constrain available cpu time while \ picking PlanetLab nodes. Specifies a lower acceptable \ bound.", type = Types.Double, range = (0, 100), flags = Flags.Filter) max_cpu = Attribute("maxCpu", "Constrain available cpu time while \ picking PlanetLab nodes. Specifies an upper acceptable \ bound.", type = Types.Double, range = (0, 100), flags = Flags.Filter) timeframe = Attribute("timeframe", "Past time period in which to check\ information about the node. Values are year,month, \ week, latest", default = "week", type = Types.Enumerate, allowed = ["latest", "week", "month", "year"], flags = Flags.Filter) cls._register_attribute(ip) cls._register_attribute(pl_url) cls._register_attribute(pl_ptn) cls._register_attribute(pl_user) cls._register_attribute(pl_password) #cls._register_attribute(site) cls._register_attribute(city) cls._register_attribute(country) cls._register_attribute(region) cls._register_attribute(architecture) cls._register_attribute(operating_system) cls._register_attribute(min_reliability) cls._register_attribute(max_reliability) cls._register_attribute(min_bandwidth) cls._register_attribute(max_bandwidth) cls._register_attribute(min_load) cls._register_attribute(max_load) cls._register_attribute(min_cpu) cls._register_attribute(max_cpu) cls._register_attribute(timeframe) def __init__(self, ec, guid): super(PlanetlabNode, self).__init__(ec, guid) self._plapi = None self._node_to_provision = None self._slicenode = False def _skip_provision(self): pl_user = self.get("pluser") pl_pass = self.get("plpassword") if not pl_user and not pl_pass: return True else: return False @property def plapi(self): if not self._plapi: pl_user = self.get("pluser") pl_pass = self.get("plpassword") pl_url = self.get("plcApiUrl") pl_ptn = self.get("plcApiPattern") self._plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url, pl_ptn) if not self._plapi: self.fail_plapi() return self._plapi def do_discover(self): """ Based on the attributes defined by the user, discover the suitable nodes for provision. """ if self._skip_provision(): super(PlanetlabNode, self).do_discover() return hostname = self._get_hostname() if hostname: # the user specified one particular node to be provisioned # check with PLCAPI if it is alvive node_id = self._query_if_alive(hostname=hostname) node_id = node_id.pop() # check that the node is not blacklisted or being provisioned # by other RM with PlanetlabNode.lock: plist = self.plapi.reserved() blist = self.plapi.blacklisted() if node_id not in blist and node_id not in plist: # check that is really alive, by performing ping ping_ok = self._do_ping(node_id) if not ping_ok: self._blacklist_node(node_id) self.fail_node_not_alive(hostname) else: if self._check_if_in_slice([node_id]): self._slicenode = True self._put_node_in_provision(node_id) self._node_to_provision = node_id super(PlanetlabNode, self).do_discover() else: self.fail_node_not_available(hostname) else: # the user specifies constraints based on attributes, zero, one or # more nodes can match these constraints nodes = self._filter_based_on_attributes() nodes_alive = self._query_if_alive(nodes) # nodes that are already part of user's slice have the priority to # provisioned nodes_inslice = self._check_if_in_slice(nodes_alive) nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice)) node_id = None if nodes_inslice: node_id = self._choose_random_node(nodes_inslice) self._slicenode = True if not node_id: # Either there were no matching nodes in the user's slice, or # the nodes in the slice were blacklisted or being provisioned # by other RM. Note nodes_not_inslice is never empty node_id = self._choose_random_node(nodes_not_inslice) self._slicenode = False if node_id: self._node_to_provision = node_id try: self._set_hostname_attr(node_id) self.info(" Selected node to provision ") except: with PlanetlabNode.lock: self._blacklist_node(node_id) self.do_discover() super(PlanetlabNode, self).do_discover() else: self.fail_not_enough_nodes() def do_provision(self): """ Add node to user's slice after verifing that the node is functioning correctly """ if self._skip_provision(): super(PlanetlabNode, self).do_provision() return provision_ok = False ssh_ok = False proc_ok = False timeout = 1800 while not provision_ok: node = self._node_to_provision if not self._slicenode: self._add_node_to_slice(node) # check ssh connection t = 0 while t < timeout and not ssh_ok: cmd = 'echo \'GOOD NODE\'' ((out, err), proc) = self.execute(cmd) if out.find("GOOD NODE") < 0: t = t + 60 time.sleep(60) continue else: ssh_ok = True continue else: cmd = 'echo \'GOOD NODE\'' ((out, err), proc) = self.execute(cmd) if not out.find("GOOD NODE") < 0: ssh_ok = True if not ssh_ok: # the timeout was reach without establishing ssh connection # the node is blacklisted, deleted from the slice, and a new # node to provision is discovered with PlanetlabNode.lock: self.warn(" Could not SSH login ") self._blacklist_node(node) #self._delete_node_from_slice(node) self.set('hostname', None) self.do_discover() continue # check /proc directory is mounted (ssh_ok = True) else: cmd = 'mount |grep proc' ((out, err), proc) = self.execute(cmd) if out.find("/proc type proc") < 0: with PlanetlabNode.lock: self.warn(" Could not find directory /proc ") self._blacklist_node(node) #self._delete_node_from_slice(node) self.set('hostname', None) self.do_discover() continue else: provision_ok = True # set IP attribute ip = self._get_ip(node) self.set("ip", ip) super(PlanetlabNode, self).do_provision() def _filter_based_on_attributes(self): """ Retrive the list of nodes ids that match user's constraints """ # Map user's defined attributes with tagnames of PlanetLab timeframe = self.get("timeframe")[0] attr_to_tags = { 'city' : 'city', 'country' : 'country', 'region' : 'region', 'architecture' : 'arch', 'operatingSystem' : 'fcdistro', #'site' : 'pldistro', 'minReliability' : 'reliability%s' % timeframe, 'maxReliability' : 'reliability%s' % timeframe, 'minBandwidth' : 'bw%s' % timeframe, 'maxBandwidth' : 'bw%s' % timeframe, 'minLoad' : 'load%s' % timeframe, 'maxLoad' : 'load%s' % timeframe, 'minCpu' : 'cpu%s' % timeframe, 'maxCpu' : 'cpu%s' % timeframe, } nodes_id = [] filters = {} for attr_name, attr_obj in self._attrs.iteritems(): attr_value = self.get(attr_name) if attr_value is not None and attr_obj.flags == 8 and \ attr_name != 'timeframe': attr_tag = attr_to_tags[attr_name] filters['tagname'] = attr_tag # filter nodes by fixed constraints e.g. operating system if not 'min' in attr_name and not 'max' in attr_name: filters['value'] = attr_value nodes_id = self._filter_by_fixed_attr(filters, nodes_id) # filter nodes by range constraints e.g. max bandwidth elif ('min' or 'max') in attr_name: nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id) if not filters: nodes = self.plapi.get_nodes() for node in nodes: nodes_id.append(node['node_id']) return nodes_id def _filter_by_fixed_attr(self, filters, nodes_id): """ Query PLCAPI for nodes ids matching fixed attributes defined by the user """ node_tags = self.plapi.get_node_tags(filters) if node_tags is not None: if len(nodes_id) == 0: # first attribute being matched for node_tag in node_tags: nodes_id.append(node_tag['node_id']) else: # remove the nodes ids that don't match the new attribute # that is being match nodes_id_tmp = [] for node_tag in node_tags: if node_tag['node_id'] in nodes_id: nodes_id_tmp.append(node_tag['node_id']) if len(nodes_id_tmp): nodes_id = set(nodes_id) & set(nodes_id_tmp) else: # no node from before match the new constraint self.fail_discovery() else: # no nodes match the filter applied self.fail_discovery() return nodes_id def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id): """ Query PLCAPI for nodes ids matching attributes defined in a certain range, by the user """ node_tags = self.plapi.get_node_tags(filters) if node_tags: if len(nodes_id) == 0: # first attribute being matched for node_tag in node_tags: # check that matches the min or max restriction if 'min' in attr_name and node_tag['value'] != 'n/a' and \ float(node_tag['value']) > attr_value: nodes_id.append(node_tag['node_id']) elif 'max' in attr_name and node_tag['value'] != 'n/a' and \ float(node_tag['value']) < attr_value: nodes_id.append(node_tag['node_id']) else: # remove the nodes ids that don't match the new attribute # that is being match nodes_id_tmp = [] for node_tag in node_tags: # check that matches the min or max restriction and was a # matching previous filters if 'min' in attr_name and node_tag['value'] != 'n/a' and \ float(node_tag['value']) > attr_value and \ node_tag['node_id'] in nodes_id: nodes_id_tmp.append(node_tag['node_id']) elif 'max' in attr_name and node_tag['value'] != 'n/a' and \ float(node_tag['value']) < attr_value and \ node_tag['node_id'] in nodes_id: nodes_id_tmp.append(node_tag['node_id']) if len(nodes_id_tmp): nodes_id = set(nodes_id) & set(nodes_id_tmp) else: # no node from before match the new constraint self.fail_discovery() else: #TODO CHECK # no nodes match the filter applied self.fail_discovery() return nodes_id def _query_if_alive(self, nodes_id=None, hostname=None): """ Query PLCAPI for nodes that register activity recently, using filters related to the state of the node, e.g. last time it was contacted """ if nodes_id is None and hostname is None: msg = "Specify nodes_id or hostname" raise RuntimeError, msg if nodes_id is not None and hostname is not None: msg = "Specify either nodes_id or hostname" raise RuntimeError, msg # define PL filters to check the node is alive filters = dict() filters['run_level'] = 'boot' filters['boot_state'] = 'boot' filters['node_type'] = 'regular' #filters['>last_contact'] = int(time.time()) - 2*3600 # adding node_id or hostname to the filters to check for the particular # node if nodes_id: filters['node_id'] = list(nodes_id) alive_nodes_id = self._get_nodes_id(filters) elif hostname: filters['hostname'] = hostname alive_nodes_id = self._get_nodes_id(filters) if len(alive_nodes_id) == 0: self.fail_node_not_alive(hostname) else: nodes_id = list() for node_id in alive_nodes_id: nid = node_id['node_id'] nodes_id.append(nid) return nodes_id def _choose_random_node(self, nodes): """ From the possible nodes for provision, choose randomly to decrese the probability of different RMs choosing the same node for provision """ size = len(nodes) while size: size = size - 1 index = randint(0, size) node_id = nodes[index] nodes[index] = nodes[size] # check the node is not blacklisted or being provision by other RM # and perform ping to check that is really alive with PlanetlabNode.lock: blist = self.plapi.blacklisted() plist = self.plapi.reserved() if node_id not in blist and node_id not in plist: ping_ok = self._do_ping(node_id) if not ping_ok: self._set_hostname_attr(node_id) self.warn(" Node not responding PING ") self._blacklist_node(node_id) self.set('hostname', None) else: # discovered node for provision, added to provision list self._put_node_in_provision(node_id) return node_id def _get_nodes_id(self, filters): return self.plapi.get_nodes(filters, fields=['node_id']) def _add_node_to_slice(self, node_id): self.info(" Adding node to slice ") slicename = self.get("username") with PlanetlabNode.lock: slice_nodes = self.plapi.get_slice_nodes(slicename) slice_nodes.append(node_id) self.plapi.add_slice_nodes(slicename, slice_nodes) def _delete_node_from_slice(self, node): self.warn(" Deleting node from slice ") slicename = self.get("username") self.plapi.delete_slice_node(slicename, [node]) def _get_hostname(self): hostname = self.get("hostname") ip = self.get("ip") if hostname: return hostname elif ip: hostname = sshfuncs.gethostbyname(ip) return hostname else: return None def _set_hostname_attr(self, node): """ Query PLCAPI for the hostname of a certain node id and sets the attribute hostname, it will over write the previous value """ hostname = self.plapi.get_nodes(node, ['hostname']) self.set("hostname", hostname[0]['hostname']) def _check_if_in_slice(self, nodes_id): """ Query PLCAPI to find out if any node id from nodes_id is in the user's slice """ slicename = self.get("username") slice_nodes = self.plapi.get_slice_nodes(slicename) nodes_inslice = list(set(nodes_id) & set(slice_nodes)) return nodes_inslice def _do_ping(self, node_id): """ Perform ping command on node's IP matching node id """ ping_ok = False ip = self._get_ip(node_id) if not ip: return ping_ok command = "ping -c4 %s" % ip (out, err) = lexec(command) if not out.find("2 received") or not out.find("3 received") or not \ out.find("4 received") < 0: ping_ok = True return ping_ok def _blacklist_node(self, node): """ Add node mal functioning node to blacklist """ self.warn(" Blacklisting malfunctioning node ") self._plapi.blacklist_host(node) def _put_node_in_provision(self, node): """ Add node to the list of nodes being provisioned, in order for other RMs to not try to provision the same one again """ self._plapi.reserve_host(node) def _get_ip(self, node_id): """ Query PLCAPI for the IP of a node with certain node id """ hostname = self.plapi.get_nodes(node_id, ['hostname'])[0] try: ip = sshfuncs.gethostbyname(hostname['hostname']) except: # Fail while trying to find the IP return None return ip def fail_discovery(self): msg = "Discovery failed. No candidates found for node" self.error(msg) raise RuntimeError, msg def fail_node_not_alive(self, hostname=None): msg = "Node %s not alive" % hostname raise RuntimeError, msg def fail_node_not_available(self, hostname): msg = "Node %s not available for provisioning" % hostname raise RuntimeError, msg def fail_not_enough_nodes(self): msg = "Not enough nodes available for provisioning" raise RuntimeError, msg def fail_plapi(self): msg = "Failing while trying to instanciate the PLC API.\nSet the" + \ " attributes pluser and plpassword." raise RuntimeError, msg def valid_connection(self, guid): # TODO: Validate! return True