X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fec.py;h=d6fcf8715f09ace3d5663fa9a78ae791663e15a9;hb=b1c9e8465031caee7ca146a60be2d31fbe781bbb;hp=42bb138700fdf2c71edf364fbc5c331a53b957ec;hpb=4896d77f40a611a22f9f1f8f2ae0e63e9008fee1;p=nepi.git diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 42bb1387..d6fcf871 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -17,6 +17,7 @@ """ +import functools import logging import os import random @@ -285,71 +286,59 @@ class ExperimentController(object): """ self.logger.debug(" ------- DEPLOY START ------ ") - stop = [] - - def steps(rm): - try: - rm.deploy() - rm.start_with_conditions() - - # Only if the RM has STOP conditions we - # schedule a stop. Otherwise the RM will stop immediately - if rm.conditions.get(ResourceAction.STOP): - rm.stop_with_conditions() - except: - import traceback - err = traceback.format_exc() - - self._logger.error("Error occurred while deploying resources: %s" % err) - - # stop deployment - stop.append(None) - if not group: group = self.resources # Before starting deployment we disorder the group list with the # purpose of speeding up the whole deployment process. # It is likely that the user inserted in the 'group' list closely - # resources resources one after another (e.g. all applications + # resources one after another (e.g. all applications # connected to the same node can likely appear one after another). # This can originate a slow down in the deployment since the N # threads the parallel runner uses to processes tasks may all # be taken up by the same family of resources waiting for the - # same conditions. - # If we disorder the group list, this problem can be mitigated + # same conditions (e.g. LinuxApplications running on a same + # node share a single lock, so they will tend to be serialized). + # If we disorder the group list, this problem can be mitigated. random.shuffle(group) - threads = [] + def wait_all_and_start(group): + reschedule = False + for guid in group: + rm = self.get_resource(guid) + if rm.state < ResourceState.READY: + reschedule = True + break + + if reschedule: + callback = functools.partial(wait_all_and_start, group) + self.schedule("1s", callback) + else: + # If all resources are read, we schedule the start + for guid in group: + rm = self.get_resource(guid) + self.schedule("0.01s", rm.start_with_conditions) + + if wait_all_ready: + # Schedule the function that will check all resources are + # READY, and only then it will schedule the start. + # This is aimed to reduce the number of tasks looping in the scheduler. + # Intead of having N start tasks, we will have only one + callback = functools.partial(wait_all_and_start, group) + self.schedule("1s", callback) + for guid in group: rm = self.get_resource(guid) + self.schedule("0.001s", rm.deploy) - if wait_all_ready: - towait = list(group) - towait.remove(guid) - self.register_condition(guid, ResourceAction.START, - towait, ResourceState.READY) + if not wait_all_ready: + self.schedule("1s", rm.start_with_conditions) - thread = threading.Thread(target = steps, args = (rm,)) - threads.append(thread) - thread.setDaemon(True) - thread.start() - - while list(threads) and not self.finished and not stop: - thread = threads[0] - # Time out after 5 seconds to check EC not terminated - thread.join(1) - if not thread.is_alive(): - threads.remove(thread) - - if stop: - # stop the scheduler - self._stop_scheduler() - - if self._thread.is_alive(): - self._thread.join() + if rm.conditions.get(ResourceAction.STOP): + # Only if the RM has STOP conditions we + # schedule a stop. Otherwise the RM will stop immediately + self.schedule("2s", rm.stop_with_conditions) - raise RuntimeError, "Error occurred, interrupting deployment " def release(self, group = None): if not group: @@ -369,7 +358,7 @@ class ExperimentController(object): thread.join(5) if not thread.is_alive(): threads.remove(thread) - + def shutdown(self): self.release() @@ -417,7 +406,7 @@ class ExperimentController(object): self._cond.acquire() task = self._scheduler.next() self._cond.release() - + if not task: # It there are not tasks in the tasks queue we need to # wait until a call to schedule wakes us up @@ -440,18 +429,17 @@ class ExperimentController(object): else: # Process tasks in parallel runner.put(self._execute, task) - except: import traceback err = traceback.format_exc() self._logger.error("Error while processing tasks in the EC: %s" % err) self._state = ECState.FAILED - finally: - runner.sync() # Mark EC state as terminated if self.ecstate == ECState.RUNNING: + # Synchronize to get errors if occurred + runner.sync() self._state = ECState.TERMINATED def _execute(self, task):