+ # the user specifies constraints based on attributes, zero, one or
+ # more nodes can match these constraints
+ nodes = self._filter_based_on_attributes()
+
+ # nodes that are already part of user's slice have the priority to
+ # provisioned
+ nodes_inslice = self._check_if_in_slice(nodes)
+ nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
+
+ node_id = None
+ if nodes_inslice:
+ node_id = self._choose_random_node(nodes_inslice)
+ self._slicenode = True
+
+ if not node_id:
+ # Either there were no matching nodes in the user's slice, or
+ # the nodes in the slice were blacklisted or being provisioned
+ # by other RM. Note nodes_not_inslice is never empty
+ node_id = self._choose_random_node(nodes_not_inslice)
+ self._slicenode = False
+
+ if node_id:
+ self._node_to_provision = node_id
+ try:
+ self._set_hostname_attr(node_id)
+ self.info(" Selected node to provision ")
+ super(PlanetlabNode, self).do_discover()
+ except:
+ with PlanetlabNode.lock:
+ self._blacklist_node(node_id)
+ self.do_discover()
+ else:
+ self.fail_not_enough_nodes()
+
+ def do_provision(self):
+ """
+ Add node to user's slice after verifing that the node is functioning
+ correctly
+ """
+ if self._skip_provision():
+ super(PlanetlabNode, self).do_provision()
+ return
+
+ provision_ok = False
+ ssh_ok = False
+ proc_ok = False
+ timeout = 1800
+
+ while not provision_ok:
+ node = self._node_to_provision
+ if not self._slicenode:
+ self._add_node_to_slice(node)
+ if self._check_if_in_slice([node]):
+ self.debug( "Node added to slice" )
+ else:
+ self.warning(" Could not add to slice ")
+ with PlanetlabNode.lock:
+ self._blacklist_node(node)
+ self.do_discover()
+ continue
+
+ # check ssh connection
+ t = 0
+ while t < timeout and not ssh_ok:
+
+ cmd = 'echo \'GOOD NODE\''
+ ((out, err), proc) = self.execute(cmd)
+ if out.find("GOOD NODE") < 0:
+ self.debug( "No SSH connection, waiting 60s" )
+ t = t + 60
+ time.sleep(60)
+ continue
+ else:
+ self.debug( "SSH OK" )
+ ssh_ok = True
+ continue
+ else:
+ cmd = 'echo \'GOOD NODE\''
+ ((out, err), proc) = self.execute(cmd)
+ if not out.find("GOOD NODE") < 0:
+ ssh_ok = True
+
+ if not ssh_ok:
+ # the timeout was reach without establishing ssh connection
+ # the node is blacklisted, deleted from the slice, and a new
+ # node to provision is discovered
+ with PlanetlabNode.lock:
+ self.warning(" Could not SSH login ")
+ self._blacklist_node(node)
+ #self._delete_node_from_slice(node)
+ self.do_discover()
+ continue
+
+ # check /proc directory is mounted (ssh_ok = True)
+ # and file system is not read only
+ else:
+ cmd = 'mount |grep proc'
+ ((out1, err1), proc1) = self.execute(cmd)
+ cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
+ ((out2, err2), proc2) = self.execute(cmd)
+ if out1.find("/proc type proc") < 0 or \
+ "Read-only file system".lower() in err2.lower():
+ with PlanetlabNode.lock:
+ self.warning(" Corrupted file system ")
+ self._blacklist_node(node)
+ #self._delete_node_from_slice(node)
+ self.do_discover()
+ continue
+
+ else:
+ provision_ok = True
+ if not self.get('hostname'):
+ self._set_hostname_attr(node)
+ # set IP attribute
+ ip = self._get_ip(node)
+ self.set("ip", ip)
+ self.info(" Node provisioned ")
+
+ super(PlanetlabNode, self).do_provision()
+
+ def do_release(self):
+ super(PlanetlabNode, self).do_release()
+ if self.state == ResourceState.RELEASED and not self._skip_provision():
+ self.debug(" Releasing PLC API ")
+ self.plapi.release()
+
+ def _filter_based_on_attributes(self):
+ """
+ Retrive the list of nodes ids that match user's constraints
+ """
+ # Map user's defined attributes with tagnames of PlanetLab
+ timeframe = self.get("timeframe")[0]
+ attr_to_tags = {
+ 'city' : 'city',
+ 'country' : 'country',
+ 'region' : 'region',
+ 'architecture' : 'arch',
+ 'operatingSystem' : 'fcdistro',
+ 'minReliability' : 'reliability%s' % timeframe,
+ 'maxReliability' : 'reliability%s' % timeframe,
+ 'minBandwidth' : 'bw%s' % timeframe,
+ 'maxBandwidth' : 'bw%s' % timeframe,
+ 'minLoad' : 'load%s' % timeframe,
+ 'maxLoad' : 'load%s' % timeframe,
+ 'minCpu' : 'cpu%s' % timeframe,
+ 'maxCpu' : 'cpu%s' % timeframe,
+ }
+
+ nodes_id = []
+ filters = {}
+
+ for attr_name, attr_obj in self._attrs.iteritems():
+ attr_value = self.get(attr_name)
+
+ if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
+ attr_name != 'timeframe':
+
+ attr_tag = attr_to_tags[attr_name]
+ filters['tagname'] = attr_tag
+
+ # filter nodes by fixed constraints e.g. operating system
+ if not 'min' in attr_name and not 'max' in attr_name:
+ filters['value'] = attr_value
+ nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
+
+ # filter nodes by range constraints e.g. max bandwidth
+ elif ('min' or 'max') in attr_name:
+ nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
+
+ if not filters:
+ nodes = self._get_nodes_id()
+ for node in nodes:
+ nodes_id.append(node['node_id'])
+ return nodes_id
+
+ def _filter_by_fixed_attr(self, filters, nodes_id):
+ """
+ Query PLCAPI for nodes ids matching fixed attributes defined by the
+ user
+ """
+ node_tags = self.plapi.get_node_tags(filters)
+ if node_tags is not None:
+
+ if len(nodes_id) == 0:
+ # first attribute being matched
+ for node_tag in node_tags:
+ nodes_id.append(node_tag['node_id'])
+ else:
+ # remove the nodes ids that don't match the new attribute
+ # that is being match
+
+ nodes_id_tmp = []
+ for node_tag in node_tags:
+ if node_tag['node_id'] in nodes_id:
+ nodes_id_tmp.append(node_tag['node_id'])
+
+ if len(nodes_id_tmp):
+ nodes_id = set(nodes_id) & set(nodes_id_tmp)
+ else:
+ # no node from before match the new constraint
+ self.fail_discovery()
+ else:
+ # no nodes match the filter applied
+ self.fail_discovery()
+
+ return nodes_id
+
+ def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
+ """
+ Query PLCAPI for nodes ids matching attributes defined in a certain
+ range, by the user
+ """
+ node_tags = self.plapi.get_node_tags(filters)
+ if node_tags:
+
+ if len(nodes_id) == 0:
+ # first attribute being matched
+ for node_tag in node_tags:
+
+ # check that matches the min or max restriction
+ if 'min' in attr_name and node_tag['value'] != 'n/a' and \
+ float(node_tag['value']) > attr_value:
+ nodes_id.append(node_tag['node_id'])
+
+ elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
+ float(node_tag['value']) < attr_value:
+ nodes_id.append(node_tag['node_id'])
+ else:
+
+ # remove the nodes ids that don't match the new attribute
+ # that is being match
+ nodes_id_tmp = []
+ for node_tag in node_tags:
+
+ # check that matches the min or max restriction and was a
+ # matching previous filters
+ if 'min' in attr_name and node_tag['value'] != 'n/a' and \
+ float(node_tag['value']) > attr_value and \
+ node_tag['node_id'] in nodes_id:
+ nodes_id_tmp.append(node_tag['node_id'])
+
+ elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
+ float(node_tag['value']) < attr_value and \
+ node_tag['node_id'] in nodes_id:
+ nodes_id_tmp.append(node_tag['node_id'])
+
+ if len(nodes_id_tmp):
+ nodes_id = set(nodes_id) & set(nodes_id_tmp)
+ else:
+ # no node from before match the new constraint
+ self.fail_discovery()
+
+ else: #TODO CHECK
+ # no nodes match the filter applied
+ self.fail_discovery()
+
+ return nodes_id
+
+ def _choose_random_node(self, nodes):
+ """
+ From the possible nodes for provision, choose randomly to decrese the
+ probability of different RMs choosing the same node for provision
+ """
+ size = len(nodes)
+ while size:
+ size = size - 1
+ index = randint(0, size)
+ node_id = nodes[index]
+ nodes[index] = nodes[size]
+
+ # check the node is not blacklisted or being provision by other RM
+ # and perform ping to check that is really alive
+ with PlanetlabNode.lock:
+
+ blist = self.plapi.blacklisted()
+ plist = self.plapi.reserved()
+ if node_id not in blist and node_id not in plist:
+ ping_ok = self._do_ping(node_id)
+ if not ping_ok:
+ self._set_hostname_attr(node_id)
+ self.warning(" Node not responding PING ")
+ self._blacklist_node(node_id)
+ else:
+ # discovered node for provision, added to provision list
+ self._put_node_in_provision(node_id)
+ return node_id
+
+ def _get_nodes_id(self, filters=None):
+ return self.plapi.get_nodes(filters, fields=['node_id'])
+
+ def _add_node_to_slice(self, node_id):
+ self.info(" Adding node to slice ")
+ slicename = self.get("username")
+ with PlanetlabNode.lock:
+ slice_nodes = self.plapi.get_slice_nodes(slicename)
+ self.debug(" Previous slice nodes %s " % slice_nodes)
+ slice_nodes.append(node_id)
+ self.plapi.add_slice_nodes(slicename, slice_nodes)
+
+ def _delete_node_from_slice(self, node):
+ self.warning(" Deleting node from slice ")
+ slicename = self.get("username")
+ self.plapi.delete_slice_node(slicename, [node])
+
+ def _get_hostname(self):
+ hostname = self.get("hostname")
+ if hostname:
+ return hostname
+ ip = self.get("ip")
+ if ip:
+ hostname = socket.gethostbyaddr(ip)[0]
+ self.set('hostname', hostname)
+ return hostname
+ else:
+ return None
+
+ def _set_hostname_attr(self, node):
+ """
+ Query PLCAPI for the hostname of a certain node id and sets the
+ attribute hostname, it will over write the previous value
+ """
+ hostname = self.plapi.get_nodes(node, ['hostname'])
+ self.set("hostname", hostname[0]['hostname'])
+
+ def _check_if_in_slice(self, nodes_id):
+ """
+ Query PLCAPI to find out if any node id from nodes_id is in the user's
+ slice
+ """
+ slicename = self.get("username")
+ slice_nodes = self.plapi.get_slice_nodes(slicename)
+ nodes_inslice = list(set(nodes_id) & set(slice_nodes))
+ return nodes_inslice
+
+ def _do_ping(self, node_id):
+ """
+ Perform ping command on node's IP matching node id
+ """
+ ping_ok = False
+ ip = self._get_ip(node_id)
+ if ip:
+ command = "ping -c4 %s" % ip
+ (out, err) = lexec(command)
+
+ m = re.search("(\d+)% packet loss", str(out))
+ if m and int(m.groups()[0]) < 50:
+ ping_ok = True