Parallelize node liveliness tests
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 27 Sep 2011 02:02:34 +0000 (04:02 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 27 Sep 2011 02:02:34 +0000 (04:02 +0200)
src/nepi/testbeds/planetlab/execute.py

index dcb5023..309dca0 100644 (file)
@@ -9,6 +9,7 @@ from nepi.util.graphtools import mst
 from nepi.util import ipaddr2
 from nepi.util import environ
 from nepi.util.parallel import ParallelRun
+import threading
 import sys
 import os
 import os.path
@@ -213,45 +214,66 @@ class TestbedController(testbed_impl.TestbedController):
         # Initial algo:
         #   look for perfectly defined nodes
         #   (ie: those with only one candidate)
-        for guid, node in self._elements.iteritems():
-            if isinstance(node, self._node.Node) and node._node_id is None:
-                # Try existing nodes first
-                # If we have only one candidate, simply use it
-                candidates = node.find_candidates(
-                    filter_slice_id = self.slice_id)
+        reserve_lock = threading.Lock()
+        def assignifunique(guid, node):
+            # Try existing nodes first
+            # If we have only one candidate, simply use it
+            candidates = node.find_candidates(
+                filter_slice_id = self.slice_id)
+            
+            node_id = None
+            reserve_lock.acquire()
+            try:
                 candidates -= reserved
                 if len(candidates) == 1:
                     node_id = iter(candidates).next()
-                    node.assign_node_id(node_id)
                     reserved.add(node_id)
                 elif not candidates:
                     # Try again including unassigned nodes
-                    candidates = node.find_candidates()
+                    reserve_lock.release()
+                    try:
+                        candidates = node.find_candidates()
+                    finally:
+                        reserve_lock.acquire()
                     candidates -= reserved
                     if len(candidates) > 1:
-                        continue
+                        return
                     if len(candidates) == 1:
                         node_id = iter(candidates).next()
-                        node.assign_node_id(node_id)
                         to_provision.add(node_id)
                         reserved.add(node_id)
                     elif not candidates:
                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
                             node.make_filter_description())
+            finally:
+                reserve_lock.release()
+            
+            if node_id is not None:
+                node.assign_node_id(node_id)
+        
+        runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
+        runner.start()
+        for guid, node in self._elements.iteritems():
+            if isinstance(node, self._node.Node) and node._node_id is None:
+                runner.put(assignifunique, guid, node)
+        runner.sync()
         
         # Now do the backtracking search for a suitable solution
         # First with existing slice nodes
         reqs = []
         nodes = []
+        def genreqs(node, filter_slice_id=None):
+            # Try existing nodes first
+            # If we have only one candidate, simply use it
+            candidates = node.find_candidates(
+                filter_slice_id = filter_slice_id)
+            candidates -= reserved
+            reqs.append(candidates)
+            nodes.append(node)
         for guid, node in self._elements.iteritems():
             if isinstance(node, self._node.Node) and node._node_id is None:
-                # Try existing nodes first
-                # If we have only one candidate, simply use it
-                candidates = node.find_candidates(
-                    filter_slice_id = self.slice_id)
-                candidates -= reserved
-                reqs.append(candidates)
-                nodes.append(node)
+                runner.put(genreqs, node, self.slice_id)
+        runner.sync()
         
         if nodes and reqs:
             if recover:
@@ -272,16 +294,15 @@ class TestbedController(testbed_impl.TestbedController):
                 # Failed, try again with all nodes
                 reqs = []
                 for node in nodes:
-                    candidates = node.find_candidates()
-                    candidates -= reserved
-                    reqs.append(candidates)
-                
+                    runner.put(genreqs, node)
+                runner.sync()
                 solution = resourcealloc.alloc(reqs, sample=pickbest)
                 to_provision.update(solution)
             
             # Do assign nodes
             for node, node_id in zip(nodes, solution):
-                node.assign_node_id(node_id)
+                runner.put(node.assign_node_id, node_id)
+            runner.join()
 
     def do_provisioning(self):
         if self._to_provision:
@@ -304,12 +325,12 @@ class TestbedController(testbed_impl.TestbedController):
             
                 # Show the magic
                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
-            
+        
         try:
-            for guid, node in self._elements.iteritems():
-                if isinstance(node, self._node.Node):
-                    self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
-                    
+            runner = ParallelRun(maxqueue=1)
+            abort = []
+            def waitforit(guid, node):
+                try:
                     node.wait_provisioning(
                         (20*60 if node._node_id in self._just_provisioned else 60)
                     )
@@ -318,9 +339,21 @@ class TestbedController(testbed_impl.TestbedController):
                     
                     # Prepare dependency installer now
                     node.prepare_dependencies()
+                except:
+                    abort.append(None)
+                    raise
+                
+            for guid, node in self._elements.iteritems():
+                if abort:
+                    break
+                if isinstance(node, self._node.Node):
+                    self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
+                    runner.put(waitforit, guid, node)
+            runner.join()
+                    
         except self._node.UnresponsiveNodeError:
             # Uh... 
-            self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
+            self._logger.warn("UNRESPONSIVE Nodes")
             
             # Mark all dead nodes (which are unresponsive) on the blacklist
             # and re-raise