self._failed_testbeds = set()
self._started_time = None
self._stopped_time = None
+ self._testbed_order = []
self._logger = logging.getLogger('nepi.core.execute')
level = logging.ERROR
def _parallel(callables):
excs = []
def wrap(callable):
- @functools.wraps(callable)
def wrapped(*p, **kw):
try:
callable(*p, **kw)
except:
logging.exception("Exception occurred in asynchronous thread:")
excs.append(sys.exc_info())
+ try:
+ wrapped = functools.wraps(callable)(wrapped)
+ except:
+ # functools.partial not wrappable
+ pass
return wrapped
threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
for thread in threads:
for guid,testbed in self._testbeds.iteritems()
if guid in allowed_guids])
self._clear_caches()
+
+ # Store testbed order
+ self._testbed_order.append(allowed_guids)
steps_to_configure(self, to_restart)
def shutdown(self):
exceptions = list()
- for testbed in self._testbeds.values():
+ ordered_testbeds = set()
+
+ def shutdown_testbed(guid):
try:
+ testbed = self._testbeds[guid]
+ ordered_testbeds.add(guid)
testbed.shutdown()
except:
exceptions.append(sys.exc_info())
+
+ self._logger.debug("ExperimentController: Starting parallel shutdown")
+
+ for testbed_guids in reversed(self._testbed_order):
+ self._parallel([functools.partial(shutdown_testbed, guid)
+ for guid in testbed_guids])
+ remaining_guids = set(self._testbeds) - ordered_testbeds
+ if remaining_guids:
+ self._parallel([functools.partial(shutdown_testbed, guid)
+ for guid in remaining_guids])
+
for exc_info in exceptions:
raise exc_info[0], exc_info[1], exc_info[2]