Blacklist nodes that are not so healthy
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Wed, 22 Jun 2011 15:14:31 +0000 (17:14 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Wed, 22 Jun 2011 15:14:31 +0000 (17:14 +0200)
src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/metadata_v01.py
src/nepi/testbeds/planetlab/node.py
src/nepi/util/parallel.py

index 8813ae4..3a1b65a 100644 (file)
@@ -6,6 +6,7 @@ from nepi.core import testbed_impl
 from nepi.util.constants import TIME_NOW
 from nepi.util.graphtools import mst
 from nepi.util import ipaddr2
+import sys
 import os
 import os.path
 import time
@@ -36,6 +37,8 @@ class TestbedController(testbed_impl.TestbedController):
         self._node = node
         self._interfaces = interfaces
         self._app = application
+        
+        self._blacklist = set()
 
     @property
     def home_directory(self):
@@ -99,12 +102,23 @@ class TestbedController(testbed_impl.TestbedController):
     do_poststep_configure = staticmethod(do_post_asynclaunch)
 
     def do_preconfigure(self):
-        # Perform resource discovery if we don't have
-        # specific resources assigned yet
-        self.do_resource_discovery()
+        while True:
+            # Perform resource discovery if we don't have
+            # specific resources assigned yet
+            self.do_resource_discovery()
 
-        # Create PlanetLab slivers
-        self.do_provisioning()
+            # Create PlanetLab slivers
+            self.do_provisioning()
+            
+            try:
+                # Wait for provisioning
+                self.do_wait_nodes()
+                
+                # Okkey...
+                break
+            except self._node.UnresponsiveNodeError:
+                # Oh... retry...
+                pass
         
         # Plan application deployment
         self.do_spanning_deployment_plan()
@@ -115,6 +129,11 @@ class TestbedController(testbed_impl.TestbedController):
     def do_resource_discovery(self):
         to_provision = self._to_provision = set()
         
+        reserved = set(self._blacklist)
+        for guid, node in self._elements.iteritems():
+            if isinstance(node, self._node.Node) and node._node_id is not None:
+                reserved.add(node._node_id)
+        
         # Initial algo:
         #   look for perfectly defined nodes
         #   (ie: those with only one candidate)
@@ -124,9 +143,12 @@ class TestbedController(testbed_impl.TestbedController):
                 # If we have only one candidate, simply use it
                 candidates = node.find_candidates(
                     filter_slice_id = self.slice_id)
+                candidates -= reserved
                 if len(candidates) == 1:
-                    node.assign_node_id(iter(candidates).next())
-                else:
+                    node_id = iter(candidates).next()
+                    node.assign_node_id(node_id)
+                    reserved.add(node_id)
+                elif not candidates:
                     # Try again including unassigned nodes
                     candidates = node.find_candidates()
                     if len(candidates) > 1:
@@ -135,6 +157,7 @@ class TestbedController(testbed_impl.TestbedController):
                         node_id = iter(candidates).next()
                         node.assign_node_id(node_id)
                         to_provision.add(node_id)
+                        reserved.add(node_id)
                     elif not candidates:
                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
                             node.make_filter_description())
@@ -149,6 +172,7 @@ class TestbedController(testbed_impl.TestbedController):
                 # If we have only one candidate, simply use it
                 candidates = node.find_candidates(
                     filter_slice_id = self.slice_id)
+                candidates -= reserved
                 reqs.append(candidates)
                 nodes.append(node)
         
@@ -179,6 +203,40 @@ class TestbedController(testbed_impl.TestbedController):
         # cleanup
         del self._to_provision
     
+    def do_wait_nodes(self):
+        for guid, node in self._elements.iteritems():
+            if isinstance(node, self._node.Node):
+                # Just inject configuration stuff
+                node.home_path = "nepi-node-%s" % (guid,)
+                node.ident_path = self.sliceSSHKey
+                node.slicename = self.slicename
+            
+                # Show the magic
+                print "PlanetLab Node", guid, "configured at", node.hostname
+            
+        try:
+            for guid, node in self._elements.iteritems():
+                if isinstance(node, self._node.Node):
+                    print "Waiting for Node", guid, "configured at", node.hostname,
+                    sys.stdout.flush()
+                    
+                    node.wait_provisioning()
+                    
+                    print "READY"
+        except self._node.UnresponsiveNodeError:
+            # Uh... 
+            print "UNRESPONSIVE"
+            
+            # Mark all dead nodes (which are unresponsive) on the blacklist
+            # and re-raise
+            for guid, node in self._elements.iteritems():
+                if isinstance(node, self._node.Node):
+                    if not node.is_alive():
+                        print "Blacklisting", node.hostname, "for unresponsiveness"
+                        self._blacklist.add(node._node_id)
+                        node.unassign_node()
+            raise
+    
     def do_spanning_deployment_plan(self):
         # Create application groups by collecting all applications
         # based on their hash - the hash should contain everything that
index 85ae099..8ca0e99 100644 (file)
@@ -377,12 +377,6 @@ def configure_node(testbed_instance, guid):
     # Do some validations
     node.validate()
     
-    # recently provisioned nodes may not be up yet
-    sleeptime = 1.0
-    while not node.is_alive():
-        time.sleep(sleeptime)
-        sleeptime = min(30.0, sleeptime*1.5)
-    
     # this will be done in parallel in all nodes
     # this call only spawns the process
     node.install_dependencies()
index 8557a95..9e56c32 100644 (file)
@@ -10,8 +10,14 @@ import os
 import collections
 import cStringIO
 import resourcealloc
+import socket
+import sys
 
 from nepi.util import server
+from nepi.util import parallel
+
+class UnresponsiveNodeError(RuntimeError):
+    pass
 
 class Node(object):
     BASEFILTERS = {
@@ -101,6 +107,8 @@ class Node(object):
         )
     
     def find_candidates(self, filter_slice_id=None):
+        print >>sys.stderr, "Finding candidates for", self.make_filter_description()
+        
         fields = ('node_id',)
         replacements = {'timeframe':self.timeframe}
         
@@ -113,6 +121,8 @@ class Node(object):
         # only pick healthy nodes
         basefilters['run_level'] = 'boot'
         basefilters['boot_state'] = 'boot'
+        basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
+        basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
         
         # keyword-only "pseudofilters"
         extra = {}
@@ -182,6 +192,24 @@ class Node(object):
                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
             
             candidates = set(filter(predicate, candidates))
+        
+        # make sure hostnames are resolvable
+        if candidates:
+            print >>sys.stderr, "  Found", len(candidates), "candidates. Checking for reachability..."
+            
+            hostnames = dict(map(operator.itemgetter('node_id','hostname'),
+                self._api.GetNodes(list(candidates), ['node_id','hostname'])
+            ))
+            def resolvable(node_id):
+                try:
+                    addr = socket.gethostbyname(hostnames[node_id])
+                    return addr is not None
+                except:
+                    return False
+            candidates = set(parallel.pfilter(resolvable, candidates,
+                maxthreads = 16))
+
+            print >>sys.stderr, "  Found", len(candidates), "reachable candidates."
             
         return candidates
     
@@ -225,11 +253,19 @@ class Node(object):
         self._node_id = node_id
         self.fetch_node_info()
     
+    def unassign_node(self):
+        self._node_id = None
+        self.__dict__.update(self.__orig_attrs)
+    
     def fetch_node_info(self):
+        orig_attrs = {}
+        
         info = self._api.GetNodes(self._node_id)[0]
         tags = dict( (t['tagname'],t['value'])
                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
 
+        orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
+        orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
         self.min_num_external_ifaces = None
         self.max_num_external_ifaces = None
         self.timeframe = 'm'
@@ -238,14 +274,19 @@ class Node(object):
         for attr, tag in self.BASEFILTERS.iteritems():
             if tag in info:
                 value = info[tag]
+                if hasattr(self, attr):
+                    orig_attrs[attr] = getattr(self, attr)
                 setattr(self, attr, value)
         for attr, (tag,_) in self.TAGFILTERS.iteritems():
             tag = tag % replacements
             if tag in tags:
                 value = tags[tag]
+                if hasattr(self, attr):
+                    orig_attrs[attr] = getattr(self, attr)
                 setattr(self, attr, value)
         
         if 'peer_id' in info:
+            orig_attrs['site'] = self.site
             self.site = self._api.peer_map[info['peer_id']]
         
         if 'interface_ids' in info:
@@ -253,7 +294,10 @@ class Node(object):
             self.max_num_external_ifaces = len(info['interface_ids'])
         
         if 'ssh_rsa_key' in info:
+            orig_attrs['server_key'] = self.server_key
             self.server_key = info['ssh_rsa_key']
+        
+        self.__orig_attrs = orig_attrs
 
     def validate(self):
         if self.home_path is None:
@@ -291,6 +335,21 @@ class Node(object):
             if proc.wait():
                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
     
+    def wait_provisioning(self):
+        # recently provisioned nodes may not be up yet
+        sleeptime = 1.0
+        totaltime = 0.0
+        while not self.is_alive():
+            time.sleep(sleeptime)
+            totaltime += sleeptime
+            sleeptime = min(30.0, sleeptime*1.5)
+            
+            if totaltime > 20*60:
+                # PlanetLab has a 15' delay on configuration propagation
+                # If we're above that delay, the unresponsiveness is not due
+                # to this delay.
+                raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
+    
     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
         if self.required_packages:
             pidfile = self.DEPENDS_PIDFILE
index d1169a5..abf2202 100644 (file)
@@ -88,4 +88,49 @@ class ParallelMap(object):
                         raise StopIteration
             
     
+class ParallelFilter(ParallelMap):
+    class _FILTERED:
+        pass
     
+    def __filter(self, x):
+        if self.filter_condition(x):
+            return x
+        else:
+            return self._FILTERED
+    
+    def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
+        super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
+        self.filter_condition = filter_condition
+
+    def put(self, what):
+        super(ParallelFilter, self).put(self.__filter, what)
+    
+    def put_nowait(self, what):
+        super(ParallelFilter, self).put_nowait(self.__filter, what)
+        
+    def __iter__(self):
+        for rv in super(ParallelFilter, self).__iter__():
+            if rv is not self._FILTERED:
+                yield rv
+
+
+def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
+    mapper = ParallelMap(
+        maxthreads = maxthreads,
+        maxqueue = maxqueue,
+        results = True)
+    mapper.start()
+    for elem in iterable:
+        mapper.put(elem)
+    return list(mapper)
+
+def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
+    filtrer = ParallelFilter(
+        condition,
+        maxthreads = maxthreads,
+        maxqueue = maxqueue)
+    filtrer.start()
+    for elem in iterable:
+        filtrer.put(elem)
+    return list(filtrer)
+