Fix shutdown order to respect creation order (important when running nepi-in-nepi)
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Thu, 8 Sep 2011 10:49:34 +0000 (12:49 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Thu, 8 Sep 2011 10:49:34 +0000 (12:49 +0200)
src/nepi/core/execute.py

index 1c51d58..cf32e6a 100644 (file)
@@ -241,6 +241,7 @@ class ExperimentController(object):
         self._failed_testbeds = set()
         self._started_time = None
         self._stopped_time = None
+        self._testbed_order = []
       
         self._logger = logging.getLogger('nepi.core.execute')
         level = logging.ERROR
@@ -323,13 +324,17 @@ class ExperimentController(object):
     def _parallel(callables):
         excs = []
         def wrap(callable):
-            @functools.wraps(callable)
             def wrapped(*p, **kw):
                 try:
                     callable(*p, **kw)
                 except:
                     logging.exception("Exception occurred in asynchronous thread:")
                     excs.append(sys.exc_info())
+            try:
+                wrapped = functools.wraps(callable)(wrapped)
+            except:
+                # functools.partial not wrappable
+                pass
             return wrapped
         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
         for thread in threads:
@@ -403,6 +408,9 @@ class ExperimentController(object):
                             for guid,testbed in self._testbeds.iteritems()
                             if guid in allowed_guids])
             self._clear_caches()
+            
+            # Store testbed order
+            self._testbed_order.append(allowed_guids)
 
         steps_to_configure(self, to_restart)
 
@@ -693,11 +701,26 @@ class ExperimentController(object):
 
     def shutdown(self):
         exceptions = list()
-        for testbed in self._testbeds.values():
+        ordered_testbeds = set()
+
+        def shutdown_testbed(guid):
             try:
+                testbed = self._testbeds[guid]
+                ordered_testbeds.add(guid)
                 testbed.shutdown()
             except:
                 exceptions.append(sys.exc_info())
+                
+        self._logger.debug("ExperimentController: Starting parallel shutdown")
+        
+        for testbed_guids in reversed(self._testbed_order):
+            self._parallel([functools.partial(shutdown_testbed, guid)
+                            for guid in testbed_guids])
+        remaining_guids = set(self._testbeds) - ordered_testbeds
+        if remaining_guids:
+            self._parallel([functools.partial(shutdown_testbed, guid)
+                            for guid in remaining_guids])
+            
         for exc_info in exceptions:
             raise exc_info[0], exc_info[1], exc_info[2]