Ticket #71: inner parallelization of setup phases
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 19 Jul 2011 14:46:35 +0000 (16:46 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 19 Jul 2011 14:46:35 +0000 (16:46 +0200)
src/nepi/core/metadata.py
src/nepi/core/testbed_impl.py
src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/metadata.py
src/nepi/util/parallel.py

index 83b408e..d84ccbd 100644 (file)
@@ -11,6 +11,11 @@ from nepi.util.constants import ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, \
         DeploymentConfiguration as DC, \
         AttributeCategories as AC
 
+class Parallel(object):
+    def __init__(self, factory, maxthreads = 16):
+        self.factory = factory
+        self.maxthreads = maxthreads
+
 class MetadataInfo(object):
     @property
     def connector_types(self):
@@ -69,21 +74,24 @@ class MetadataInfo(object):
     @property
     def create_order(self):
         """ list of factory ids that indicates the order in which the elements
-        should be instantiated.
+        should be instantiated. If wrapped within a Parallel instance, they
+        will be instantiated in parallel.
         """
         raise NotImplementedError
 
     @property
     def configure_order(self):
         """ list of factory ids that indicates the order in which the elements
-        should be configured.
+        should be configured. If wrapped within a Parallel instance, they
+        will be configured in parallel.
         """
         raise NotImplementedError
 
     @property
     def preconfigure_order(self):
         """ list of factory ids that indicates the order in which the elements
-        should be preconfigured.
+        should be preconfigured. If wrapped within a Parallel instance, they
+        will be configured in parallel.
         
         Default: same as configure_order
         """
@@ -92,7 +100,8 @@ class MetadataInfo(object):
     @property
     def prestart_order(self):
         """ list of factory ids that indicates the order in which the elements
-        should be prestart-configured.
+        should be prestart-configured. If wrapped within a Parallel instance, they
+        will be configured in parallel.
         
         Default: same as configure_order
         """
@@ -101,7 +110,8 @@ class MetadataInfo(object):
     @property
     def start_order(self):
         """ list of factory ids that indicates the order in which the elements
-        should be started.
+        should be started. If wrapped within a Parallel instance, they
+        will be started in parallel.
         
         Default: same as configure_order
         """
index 8b49c8e..f496f4a 100644 (file)
@@ -2,12 +2,13 @@
 # -*- coding: utf-8 -*-
 
 from nepi.core import execute
-from nepi.core.metadata import Metadata
+from nepi.core.metadata import Metadata, Parallel
 from nepi.util import validation
 from nepi.util.constants import TIME_NOW, \
         ApplicationStatus as AS, \
         TestbedStatus as TS, \
         CONNECTION_DELAY
+from nepi.util.parallel import ParallelRun
 
 import collections
 import copy
@@ -216,18 +217,40 @@ class TestbedController(execute.TestbedController):
         # order guids (elements) according to factory_id
         for guid, factory_id in self._create.iteritems():
             guids[factory_id].append(guid)
+        
         # configure elements following the factory_id order
         for factory_id in order:
+            # Create a parallel runner if we're given a Parallel() wrapper
+            runner = None
+            if isinstance(factory_id, Parallel):
+                runner = ParallelRun(factory_id.maxthreads)
+                factory_id = factory_id.factory
+            
             # omit the factories that have no element to create
             if factory_id not in guids:
                 continue
+            
+            # configure action
             factory = self._factories[factory_id]
             if not getattr(factory, action):
                 continue
-            for guid in guids[factory_id]:
+            def perform_action(guid):
                 getattr(factory, action)(self, guid)
                 if postaction:
                     postaction(self, guid)
+
+            # perform the action on all elements, in parallel if so requested
+            if runner:
+                runner.start()
+            for guid in guids[factory_id]:
+                if runner:
+                    runner.put(perform_action, guid)
+                else:
+                    perform_action(guid)
+            if runner:
+                runner.join()
+            
+            # post hook
             if poststep:
                 for guid in guids[factory_id]:
                     poststep(self, guid)
index 1420601..6714f2b 100644 (file)
@@ -7,6 +7,7 @@ from nepi.util.constants import TIME_NOW
 from nepi.util.graphtools import mst
 from nepi.util import ipaddr2
 from nepi.util import environ
+from nepi.util.parallel import ParallelRun
 import sys
 import os
 import os.path
@@ -467,14 +468,23 @@ class TestbedController(testbed_impl.TestbedController):
     def shutdown(self):
         for trace in self._traces.itervalues():
             trace.close()
+        
+        runner = ParallelRun(16)
+        runner.start()
         for element in self._elements.itervalues():
             # invoke cleanup hooks
             if hasattr(element, 'cleanup'):
-                element.cleanup()
+                runner.put(element.cleanup)
+        runner.join()
+        
+        runner = ParallelRun(16)
+        runner.start()
         for element in self._elements.itervalues():
             # invoke destroy hooks
             if hasattr(element, 'destroy'):
-                element.destroy()
+                runner.put(element.destroy)
+        runner.join()
+        
         self._elements.clear()
         self._traces.clear()
 
index f2672e8..16646a5 100644 (file)
@@ -5,6 +5,7 @@ import time
 
 from constants import TESTBED_ID, TESTBED_VERSION
 from nepi.core import metadata
+from nepi.core.metadata import Parallel
 from nepi.core.attributes import Attribute
 from nepi.util import tags, validation
 from nepi.util.constants import ApplicationStatus as AS, \
@@ -937,10 +938,10 @@ traces = dict({
 
 create_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
 
-configure_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
+configure_order = [ INTERNET, Parallel(NODE), NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
 
 # Start (and prestart) node after ifaces, because the node needs the ifaces in order to set up routes
-start_order = [ INTERNET, NODEIFACE, TAPIFACE, TUNIFACE, NODE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
+start_order = [ INTERNET, NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NODE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
 
 factories_info = dict({
     NODE: dict({
index abf2202..53ae8f1 100644 (file)
@@ -32,6 +32,8 @@ class ParallelMap(object):
         self.workers = [ threading.Thread(target = self.worker) 
                          for x in xrange(maxthreads) ]
         
+        self.delayed_exceptions = []
+        
         if results:
             self.rvqueue = Queue.Queue()
         else:
@@ -56,6 +58,10 @@ class ParallelMap(object):
         for thread in self.workers:
             thread.join()
         
+        if self.delayed_exceptions:
+            typ,val,loc = self.delayed_exceptions[0]
+            raise typ,val,loc
+        
     def worker(self):
         while True:
             task = self.queue.get()
@@ -74,6 +80,7 @@ class ParallelMap(object):
                     self.queue.task_done()
             except:
                 traceback.print_exc(file = sys.stderr)
+                self.delayed_exceptions.apped(sys.exc_info())
 
     def __iter__(self):
         if self.rvqueue is not None:
@@ -113,6 +120,20 @@ class ParallelFilter(ParallelMap):
             if rv is not self._FILTERED:
                 yield rv
 
+class ParallelRun(ParallelMap):
+    def __run(self, x):
+        fn, args, kwargs = x
+        return fn(*args, **kwargs)
+    
+    def __init__(self, maxthreads = None, maxqueue = None):
+        super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
+
+    def put(self, what, *args, **kwargs):
+        super(ParallelRun, self).put(self.__run, (what, args, kwargs))
+    
+    def put_nowait(self, what, *args, **kwargs):
+        super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
+
 
 def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
     mapper = ParallelMap(