From 67a5d232414a375c224e06058fbef0ae63be58c9 Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Thu, 8 Sep 2011 12:49:34 +0200 Subject: [PATCH] Fix shutdown order to respect creation order (important when running nepi-in-nepi) --- src/nepi/core/execute.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/src/nepi/core/execute.py b/src/nepi/core/execute.py index 1c51d58f..cf32e6a4 100644 --- a/src/nepi/core/execute.py +++ b/src/nepi/core/execute.py @@ -241,6 +241,7 @@ class ExperimentController(object): self._failed_testbeds = set() self._started_time = None self._stopped_time = None + self._testbed_order = [] self._logger = logging.getLogger('nepi.core.execute') level = logging.ERROR @@ -323,13 +324,17 @@ class ExperimentController(object): def _parallel(callables): excs = [] def wrap(callable): - @functools.wraps(callable) def wrapped(*p, **kw): try: callable(*p, **kw) except: logging.exception("Exception occurred in asynchronous thread:") excs.append(sys.exc_info()) + try: + wrapped = functools.wraps(callable)(wrapped) + except: + # functools.partial not wrappable + pass return wrapped threads = [ threading.Thread(target=wrap(callable)) for callable in callables ] for thread in threads: @@ -403,6 +408,9 @@ class ExperimentController(object): for guid,testbed in self._testbeds.iteritems() if guid in allowed_guids]) self._clear_caches() + + # Store testbed order + self._testbed_order.append(allowed_guids) steps_to_configure(self, to_restart) @@ -693,11 +701,26 @@ class ExperimentController(object): def shutdown(self): exceptions = list() - for testbed in self._testbeds.values(): + ordered_testbeds = set() + + def shutdown_testbed(guid): try: + testbed = self._testbeds[guid] + ordered_testbeds.add(guid) testbed.shutdown() except: exceptions.append(sys.exc_info()) + + self._logger.debug("ExperimentController: Starting parallel shutdown") + + for testbed_guids in reversed(self._testbed_order): + self._parallel([functools.partial(shutdown_testbed, guid) + for guid in testbed_guids]) + remaining_guids = set(self._testbeds) - ordered_testbeds + if remaining_guids: + self._parallel([functools.partial(shutdown_testbed, guid) + for guid in remaining_guids]) + for exc_info in exceptions: raise exc_info[0], exc_info[1], exc_info[2] -- 2.45.2