from nepi.resources.linux.node import LinuxNode
from nepi.resources.planetlab.plcapi import PLCAPIFactory
from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
-
+import threading
@clsinit_copy
class PlanetlabNode(LinuxNode):
_rtype = "PlanetlabNode"
+ _blacklist = list()
+ _in_provision = list()
+
+ _lock_bl = threading.Lock()
+ _lock_inpro = threading.Lock()
+
+ @classmethod
+ def blacklist(cls):
+ """ Returns the blacklisted nodes
+
+ """
+ return cls._blacklist
+
+ @classmethod
+ def in_provision(cls):
+ """ Returns the nodes that anohter RM is trying to provision
+
+ """
+ return cls._in_provision
+
+ @classmethod
+ def lock_bl(cls):
+ """ Returns the lock for the blacklist
+
+ """
+ return cls._lock_bl
+
+ @classmethod
+ def lock_inpro(cls):
+ """ Returns the lock for the provision list
+
+ """
+ return cls._lock_inpro
+
+
@classmethod
def _register_attributes(cls):
ip = Attribute("ip", "PlanetLab host public IP address",
cls._register_attribute(min_cpu)
cls._register_attribute(max_cpu)
cls._register_attribute(timeframe)
+
def __init__(self, ec, guid):
super(PlanetlabNode, self).__init__(ec, guid)
return self._plapi
- #def discover(self):
- # hostname = self.get("hostname")
- # if hostname:
- # node_id = self.check_alive_and_active(hostname=hostname)
- # else:
- # from random import choice
- # nodes = self.filter_based_on_attributes()
- # nodes_alive = self.check_alive_and_active(nodes)
- # while in_blkl:
- # node_id = choice(nodes_alive)
-
-
- # self._discover_time = tnow()
- # self._state = ResourceState.DISCOVERED
- # return node_id
+ def discoveri(self):
+ bl = PlanetlabNode.blacklist()
+ inpro = PlanetlabNode.in_provision()
+ lockbl = PlanetlabNode.lock_bl()
+ lockinpro = PlanetlabNode.lock_inpro()
+ hostname = self.get("hostname")
+ if hostname:
+ node_id = self.check_alive_and_active(hostname=hostname)
+ if node_id not in bl and node_id not in inpro:
+ try_other = self.do_ping(node_id)
+ if try_other:
+ lockbl.acquire()
+ bl.append(node_id)
+ lockbl.release()
+ msg = "Node %s not alive, pick another node" % hostname
+ raise RuntimeError, msg
+ else:
+ self._discover_time = tnow()
+ self._state = ResourceState.DISCOVERED
+ return node_id
+ else:
+ msg = "Node %s not available for provisioning, pick another node" % hostname
+ raise RuntimeError, msg
+
+ else:
+ from random import randint
+ nodes = self.filter_based_on_attributes()
+ nodes_alive = self.check_alive_and_active(nodes)
+ nodes_inslice = self.check_if_in_slice(nodes_alive)
+ nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
+ if nodes_inslice:
+ size = len(nodes_inslice)
+ while size:
+ size = size - 1
+ index = randint(0, size)
+ node_id = nodes_inslice[index]
+ nodes_inslice[index] = nodes_inslice[size]
+ if node_id not in bl and node_id not in inpro:
+ try_other = self.do_ping(node_id)
+ if not try_other:
+ lockinpro.acquire()
+ inpro.append(node_id)
+ lockinpro.release()
+ self._discover_time = tnow()
+ self._state = ResourceState.DISCOVERED
+ return node_id
+ else:
+ lockbl.acquire()
+ bl.append(node_id)
+ lockbl.release()
+
+ if nodes_not_inslice:
+ size = len(nodes_not_inslice)
+ while size:
+ size = size - 1
+ index = randint(0, size)
+ node_id = nodes_not_inslice[index]
+ nodes_not_inslice[index] = nodes_not_inslice[size]
+ if node_id not in bl and node_id not in inpro:
+ try_other = self.do_ping(node_id)
+ if not try_other:
+ lockinpro.acquire()
+ inpro.append(node_id)
+ lockinpro.release()
+ self._discover_time = tnow()
+ self._state = ResourceState.DISCOVERED
+ return node_id
+ else:
+ lockbl.acquire()
+ bl.append(node_id)
+ lockbl.release()
+ msg = "Not enough nodes available for provisioning"
+ raise RuntimeError, msg
+
+
#def provision(self):
+ #de hostname para que provision haga add_node_slice, check que ip coincide con hostname
+ #command = "ssh %s@%s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no 'echo \'GOOD NODE\''" % (pl_slice, hostname p = subprocess.Popen(command, shell=True, stdout = subprocess.PIPE, stderr = subprocess.PIPE) stdout, stderr = p.communicate() if stdout.find("GOOD NODE") < 0: continue
+
+
def filter_based_on_attributes(self):
# Map attributes with tagnames of PL
filters = {}
for attr_name, attr_obj in self._attrs.iteritems():
attr_value = self.get(attr_name)
- print nodes_id
if attr_value is not None and attr_obj.flags == 8 and not 'min' in attr_name \
and not 'max' in attr_name and attr_name != 'timeframe':
attr_tag = attr_to_tags[attr_name]
nodes_id.append(nid)
return nodes_id
-# ip = self.plapi.get_interfaces({'node_id':nid}, fields=['ip'])
-# self.set('ip', ip[0]['ip'])
-#de hostname para que provision haga add_node_slice, check que ip coincide con hostname
+
+ def check_if_in_slice(self, nodes_id):
+ slicename = self.get("username")
+ slice_nodes = self.plapi.get_slice_nodes(slicename)
+ nodes_inslice = list(set(nodes_id) & set(slice_nodes))
+ return nodes_inslice
+
+ def do_ping(self, node_id):
+ ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip'])
+ ip = ip[0]['ip']
+ import subprocess
+ result = subprocess.call(["ping","-c","2",ip],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
+ if result == 0:
+ return False
+ elif result == 1 or result == 2:
+ return True
def fail(self):