Adding persistent blacklist for PL node
authorLucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
Wed, 16 Apr 2014 17:12:47 +0000 (19:12 +0200)
committerLucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
Wed, 16 Apr 2014 17:12:47 +0000 (19:12 +0200)
src/nepi/execution/attribute.py
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/resources/planetlab/node.py
src/nepi/resources/planetlab/plcapi.py

index bda72e2..fa2b104 100644 (file)
@@ -52,6 +52,8 @@ class Flags:
     # transparent to the user)
     Reserved  = 1 << 6 # 64
 
+    # Attribute global is set to all resources of rtype
+    Global  = 1 << 7 # 128
 
 class Attribute(object):
     """
index 1be5c3c..44b9578 100644 (file)
@@ -196,6 +196,16 @@ class ExperimentController(object):
         # EC state
         self._state = ECState.RUNNING
 
+        # Blacklist file for PL nodes
+        nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
+        plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
+        if not os.path.exists(plblacklist_file):
+            if os.path.isdir(nepi_home):
+                open(plblacklist_file, 'w').close()
+            else:
+                os.makedirs(nepi_home)
+                open(plblacklist_file, 'w').close()
+                    
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
@@ -609,6 +619,38 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         rm.set(name, value)
 
+    def get_global(self, rtype, name):
+        """ Returns the value of the global attribute with name 'name' on the
+        RMs of rtype 'rtype'.
+
+            :param guid: Guid of the RM
+            :type guid: int
+
+            :param name: Name of the attribute 
+            :type name: str
+
+            :return: The value of the attribute with name 'name'
+
+        """
+        rclass = ResourceFactory.get_resource_type(rtype)
+        return rclass.get_global(name)
+
+    def set_global(self, rtype, name, value):
+        """ Modifies the value of the global attribute with name 'name' on the 
+        RMs of with rtype 'rtype'.
+
+            :param guid: Guid of the RM
+            :type guid: int
+
+            :param name: Name of the attribute
+            :type name: str
+
+            :param value: Value of the attribute
+
+        """
+        rclass = ResourceFactory.get_resource_type(rtype)
+        return rclass.set_global(name, value)
+
     def state(self, guid, hr = False):
         """ Returns the state of a resource
 
index 5ad084d..d32a3dd 100644 (file)
@@ -186,7 +186,6 @@ class ResourceManager(Logger):
         attributes.
 
         """
-        
         critical = Attribute("critical", 
                 "Defines whether the resource is critical. "
                 "A failure on a critical resource will interrupt "
@@ -289,6 +288,32 @@ class ResourceManager(Logger):
         """
         return cls._backend
 
+    @classmethod
+    def get_global(cls, name):
+        """ Returns the value of a global attribute
+            Global attribute meaning an attribute for 
+            all the resources from a rtype
+
+        :param name: Name of the attribute
+        :type name: str
+        :rtype: str
+        """
+        global_attr = cls._attributes[name]
+        return global_attr.value
+
+    @classmethod
+    def set_global(cls, name, value):
+        """ Set value for a global attribute
+
+        :param name: Name of the attribute
+        :type name: str
+        :param name: Value of the attribute
+        :type name: str
+        """
+        global_attr = cls._attributes[name]
+        global_attr.value = value
+        return value
+
     def __init__(self, ec, guid):
         super(ResourceManager, self).__init__(self.get_rtype())
         
@@ -564,6 +589,9 @@ class ResourceManager(Logger):
         :rtype: str
         """
         attr = self._attrs[name]
+        if attr.has_flag(Flags.Global):
+            self.warning( "Attribute %s is global. Use get_global instead." % name)
+            
         return attr.value
 
     def has_changed(self, name):
index d2461bc..1f706a8 100644 (file)
@@ -27,10 +27,12 @@ from nepi.util.execfuncs import lexec
 from nepi.util import sshfuncs
 
 from random import randint
+import re
 import time
 import socket
 import threading
 import datetime
+import weakref
 
 @clsinit_copy
 class PlanetlabNode(LinuxNode):
@@ -94,14 +96,6 @@ class PlanetlabNode(LinuxNode):
                                         "other"],
                             flags = Flags.Filter)
 
-        #site = Attribute("site", "Constrain the PlanetLab site this node \
-        #        should reside on.",
-        #        type = Types.Enumerate,
-        #        allowed = ["PLE",
-        #                    "PLC",
-        #                    "PLJ"],
-        #        flags = Flags.Filter)
-
         min_reliability = Attribute("minReliability", "Constrain reliability \
                             while picking PlanetLab nodes. Specifies a lower \
                             acceptable bound.",
@@ -169,21 +163,20 @@ class PlanetlabNode(LinuxNode):
                                     "year"],
                         flags = Flags.Filter)
 
-#        plblacklist = Attribute("blacklist", "Take into account the file plblacklist \
-#                        in the user's home directory under .nepi directory. This file \
-#                        contains a list of PL nodes to blacklist, and at the end \
-#                        of the experiment execution the new blacklisted nodes are added.",
-#                    type = Types.Bool,
-#                    default = True,
-#                    flags = Flags.ReadOnly)
-#
+        plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
+                        in the user's home directory under .nepi directory. This file \
+                        contains a list of PL nodes to blacklist, and at the end \
+                        of the experiment execution the new blacklisted nodes are added.",
+                    type = Types.Bool,
+                    default = False,
+                    flags = Flags.Global)
+
 
         cls._register_attribute(ip)
         cls._register_attribute(pl_url)
         cls._register_attribute(pl_ptn)
         cls._register_attribute(pl_user)
         cls._register_attribute(pl_password)
-        #cls._register_attribute(site)
         cls._register_attribute(city)
         cls._register_attribute(country)
         cls._register_attribute(region)
@@ -198,10 +191,12 @@ class PlanetlabNode(LinuxNode):
         cls._register_attribute(min_cpu)
         cls._register_attribute(max_cpu)
         cls._register_attribute(timeframe)
+        cls._register_attribute(plblacklist)
 
     def __init__(self, ec, guid):
         super(PlanetlabNode, self).__init__(ec, guid)
 
+        self._ecobj = weakref.ref(ec)
         self._plapi = None
         self._node_to_provision = None
         self._slicenode = False
@@ -225,14 +220,15 @@ class PlanetlabNode(LinuxNode):
             pl_pass = self.get("plpassword")
             pl_url = self.get("plcApiUrl")
             pl_ptn = self.get("plcApiPattern")
-
-            self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
-                pl_ptn)
+            _plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
+                pl_ptn, self._ecobj())
             
-            if not self._plapi:
+            if not _plapi:
                 self.fail_plapi()
+        
+            self._plapi = weakref.ref(_plapi)
 
-        return self._plapi
+        return self._plapi()
 
     def do_discover(self):
         """
@@ -318,14 +314,21 @@ class PlanetlabNode(LinuxNode):
         provision_ok = False
         ssh_ok = False
         proc_ok = False
-        timeout = 3600
+        timeout = 1800
 
         while not provision_ok:
             node = self._node_to_provision
             if not self._slicenode:
                 self._add_node_to_slice(node)
-                self.info( " Node added to slice ")
-            
+                if self._check_if_in_slice([node]):
+                    self.debug( "Node added to slice" )
+                else:
+                    self.warning(" Could not add to slice ")
+                    with PlanetlabNode.lock:
+                        self._blacklist_node(node)
+                    self.do_discover()
+                    continue
+
                 # check ssh connection
                 t = 0 
                 while t < timeout and not ssh_ok:
@@ -333,12 +336,12 @@ class PlanetlabNode(LinuxNode):
                     cmd = 'echo \'GOOD NODE\''
                     ((out, err), proc) = self.execute(cmd)
                     if out.find("GOOD NODE") < 0:
-                        self.info(" No SSH login, sleeping for 60 seconds ")
+                        self.debug( "No SSH connection, waiting 60s" )
                         t = t + 60
                         time.sleep(60)
                         continue
                     else:
-                        self.info(" SSH login OK ")
+                        self.debug( "SSH OK" )
                         ssh_ok = True
                         continue
             else:
@@ -385,6 +388,12 @@ class PlanetlabNode(LinuxNode):
             
         super(PlanetlabNode, self).do_provision()
 
+    def do_release(self):
+        super(PlanetlabNode, self).do_release()
+        if self.state == ResourceState.RELEASED:
+            self.debug(" Releasing PLC API ")
+            self.plapi.release()
+
     def _filter_based_on_attributes(self):
         """
         Retrive the list of nodes ids that match user's constraints 
@@ -397,7 +406,6 @@ class PlanetlabNode(LinuxNode):
             'region' : 'region',
             'architecture' : 'arch',
             'operatingSystem' : 'fcdistro',
-            #'site' : 'pldistro',
             'minReliability' : 'reliability%s' % timeframe,
             'maxReliability' : 'reliability%s' % timeframe,
             'minBandwidth' : 'bw%s' % timeframe,
@@ -555,6 +563,7 @@ class PlanetlabNode(LinuxNode):
         slicename = self.get("username")
         with PlanetlabNode.lock:
             slice_nodes = self.plapi.get_slice_nodes(slicename)
+            self.debug(" Previous slice nodes %s " % slice_nodes)
             slice_nodes.append(node_id)
             self.plapi.add_slice_nodes(slicename, slice_nodes)
 
@@ -599,14 +608,13 @@ class PlanetlabNode(LinuxNode):
         """
         ping_ok = False
         ip = self._get_ip(node_id)
-        if not ip: return ping_ok
-
-        command = "ping -c4 %s" % ip
+        if ip:
+            command = "ping -c4 %s" % ip
+            (out, err) = lexec(command)
 
-        (out, err) = lexec(command)
-        if not str(out).find("2 received") < 0 or not str(out).find("3 received") < 0 or not \
-            str(out).find("4 received") < 0:
-            ping_ok = True
+            m = re.search("(\d+)% packet loss", str(out))
+            if m and int(m.groups()[0]) < 50:
+                ping_ok = True
        
         return ping_ok 
 
index 97c4846..4f4bf5c 100644 (file)
@@ -20,6 +20,7 @@
 import functools
 import hashlib
 import socket
+import os
 import time
 import threading
 import xmlrpclib
@@ -135,16 +136,15 @@ class PLCAPI(object):
      
     _required_methods = set()
 
-    def __init__(self, username = None, password = None, session_key = None, 
-            proxy = None,
-            hostname = "www.planet-lab.eu",
-            urlpattern = "https://%(hostname)s:443/PLCAPI/",
+    def __init__(self, username, password, hostname, urlpattern, ec, proxy, session_key = None, 
             local_peer = "PLE"):
 
         self._blacklist = set()
         self._reserved = set()
         self._nodes_cache = None
         self._already_cached = False
+        self._ecobj = ec
+        self.count = 1 
 
         if session_key is not None:
             self.auth = dict(AuthMethod='session', session=session_key)
@@ -175,7 +175,11 @@ class PLCAPI(object):
             self._proxy_transport = lambda : None
         
         self.threadlocal = threading.local()
-    
+
+        # Load blacklist from file
+        if self._ecobj.get_global('PlanetlabNode', 'persist_blacklist'):
+            self._set_blacklist()
+
     @property
     def api(self):
         # Cannot reuse same proxy in all threads, py2.7 is not threadsafe
@@ -213,6 +217,16 @@ class PLCAPI(object):
             warnings.warn(str(e))
         
         return True
+
+    def _set_blacklist(self):
+        nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
+        plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
+        with open(plblacklist_file, 'r') as f:
+            hosts_tobl = f.read().splitlines()
+            if hosts_tobl:
+                nodes_id = self.get_nodes(hosts_tobl, ['node_id'])
+                for node_id in nodes_id:
+                    self._blacklist.add(node_id['node_id'])
     
     @property
     def network_types(self):
@@ -429,8 +443,8 @@ class PLCAPI(object):
     def get_slice_nodes(self, slicename):
         return self.get_slices(slicename, ['node_ids'])[0]['node_ids']
 
-    def add_slice_nodes(self, slicename, nodes = None):
-        self.update_slice(slicename, nodes = nodes)
+    def add_slice_nodes(self, slicename, nodes):
+        self.update_slice(slicename, nodes=nodes)
 
     def get_node_info(self, node_id):
         self.start_multicall()
@@ -459,24 +473,39 @@ class PLCAPI(object):
         else:
             return None
 
-    def blacklist_host(self, hostname):
-        self._blacklist.add(hostname)
+    def blacklist_host(self, node_id):
+        self._blacklist.add(node_id)
 
     def blacklisted(self):
-        return self._blacklist
+        return self._blacklist 
 
-    def unblacklist_host(self, hostname):
-        del self._blacklist[hostname]
+    def unblacklist_host(self, node_id):
+        del self._blacklist[node_id]
 
-    def reserve_host(self, hostname):
-        self._reserved.add(hostname)
+    def reserve_host(self, node_id):
+        self._reserved.add(node_id)
 
     def reserved(self):
         return self._reserved
 
-    def unreserve_host(self, hostname):
-        del self._reserved[hostname]
+    def unreserve_host(self, node_id):
+        del self._reserved[node_id]
 
+    def release(self):
+        self.count -= 1
+        if self._ecobj.get_global('PlanetlabNode', 'persist_blacklist') and self.count == 0:
+            if self._blacklist:
+                to_blacklist = list()
+                hostnames = self.get_nodes(list(self._blacklist), ['hostname'])
+                for hostname in hostnames:
+                    to_blacklist.append(hostname['hostname'])
+
+                nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
+                plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
+
+                with open(plblacklist_file, 'w') as f:
+                    for host in to_blacklist:
+                        f.write("%s\n" % host)
 
 class PLCAPIFactory(object):
     """ 
@@ -492,8 +521,7 @@ class PLCAPIFactory(object):
 
     @classmethod 
     def get_api(cls, pl_user, pl_pass, pl_host,
-            pl_ptn = "https://%(hostname)s:443/PLCAPI/",
-            proxy = None):
+            pl_ptn, ec, proxy = None):
         """ Get existing PLCAPI instance
 
         :param pl_user: Planelab user name (used for web login)
@@ -512,14 +540,15 @@ class PLCAPIFactory(object):
             with cls._lock:
                 api = cls._apis.get(key)
                 if not api:
-                    api = cls.create_api(pl_user, pl_pass, pl_host, pl_ptn, proxy)
+                    api = cls.create_api(pl_user, pl_pass, pl_host, pl_ptn, ec, proxy)
+                else:
+                    api.count += 1
                 return api
         return None
 
     @classmethod 
     def create_api(cls, pl_user, pl_pass, pl_host,
-            pl_ptn = "https://%(hostname)s:443/PLCAPI/",
-            proxy = None):
+            pl_ptn, ec, proxy = None):
         """ Create an PLCAPI instance
 
         :param pl_user: Planelab user name (used for web login)
@@ -533,13 +562,8 @@ class PLCAPIFactory(object):
         :param proxy: Proxy service url
         :type pl_ptn: str
         """
-        api = PLCAPI(
-            username = pl_user,
-            password = pl_pass,
-            hostname = pl_host,
-            urlpattern = pl_ptn,
-            proxy = proxy
-        )
+        api = PLCAPI(username = pl_user, password = pl_pass, hostname = pl_host,
+            urlpattern = pl_ptn, ec = ec, proxy = proxy)
         key = cls._make_key(pl_user, pl_host)
         cls._apis[key] = api
         return api
@@ -555,3 +579,4 @@ class PLCAPIFactory(object):
         skey = "".join(map(str, args))
         return hashlib.md5(skey).hexdigest()
 
+