X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fneco%2Fexecution%2Fec.py;h=65ef175f3bbe1630a0b3bfddfbf3ac7f172040e5;hb=94f4bad47f6b00e8d1e9ad9c55bfbbd6894329fc;hp=b793d0e4424a3c732c839b95b5c5b24ca0b3491e;hpb=d32dba78910bd348b9bbeb0e8242d31bfd39c0a7;p=nepi.git diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index b793d0e4..65ef175f 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -1,5 +1,6 @@ import logging import os +import random import sys import time import threading @@ -14,6 +15,7 @@ from neco.execution.trace import TraceAttr # TODO: use multiprocessing instead of threading # TODO: Improve speed. Too slow... !! +# TODO: When something fails during deployment NECO leaves scp and ssh processes running behind!! class ECState(object): RUNNING = 1 @@ -265,18 +267,41 @@ class ExperimentController(object): """ self.logger.debug(" ------- DEPLOY START ------ ") + stop = [] + def steps(rm): - rm.deploy() - rm.start_with_conditions() + try: + rm.deploy() + rm.start_with_conditions() + + # Only if the RM has STOP consitions we + # schedule a stop. Otherwise the RM will stop immediately + if rm.conditions.get(ResourceAction.STOP): + rm.stop_with_conditions() + except: + import traceback + err = traceback.format_exc() + + self._logger.error("Error occurred while deploying resources: %s" % err) - # Only if the RM has STOP consitions we - # schedule a stop. Otherwise the RM will stop immediately - if rm.conditions.get(ResourceAction.STOP): - rm.stop_with_conditions() + # stop deployment + stop.append(None) if not group: group = self.resources + # Before starting deployment we disorder the group list with the + # purpose of speeding up the whole deployment process. + # It is likely that the user inserted in the 'group' list closely + # resources resources one after another (e.g. all applications + # connected to the same node can likely appear one after another). + # This can originate a slow down in the deployment since the N + # threads the parallel runner uses to processes tasks may all + # be taken up by the same family of resources waiting for the + # same conditions. + # If we disorder the group list, this problem can be mitigated + random.shuffle(group) + threads = [] for guid in group: rm = self.get_resource(guid) @@ -292,13 +317,22 @@ class ExperimentController(object): thread.setDaemon(True) thread.start() - while list(threads) and not self.finished: + while list(threads) and not self.finished and not stop: thread = threads[0] # Time out after 5 seconds to check EC not terminated - thread.join(5) + thread.join(1) if not thread.is_alive(): threads.remove(thread) + if stop: + # stop the scheduler + self._stop_scheduler() + + if self._thread.is_alive(): + self._thread.join() + + raise RuntimeError, "Error occurred, interrupting deployment " + def release(self, group = None): if not group: group = self.resources @@ -317,16 +351,12 @@ class ExperimentController(object): thread.join(5) if not thread.is_alive(): threads.remove(thread) - - self._state = ECState.TERMINATED def shutdown(self): self.release() - - self._cond.acquire() - self._cond.notify() - self._cond.release() + self._stop_scheduler() + if self._thread.is_alive(): self._thread.join() @@ -399,7 +429,8 @@ class ExperimentController(object): self._logger.error("Error while processing tasks in the EC: %s" % err) self._state = ECState.FAILED - return + finally: + runner.sync() # Mark EC state as terminated if self.ecstate == ECState.RUNNING: @@ -419,14 +450,18 @@ class ExperimentController(object): self._logger.error("Error occurred while executing task: %s" % err) - # Mark the EC as failed - self._state = ECState.FAILED - - # Wake up the EC in case it was sleeping - self._cond.acquire() - self._cond.notify() - self._cond.release() + self._stop_scheduler() # Propage error to the ParallelRunner raise + def _stop_scheduler(self): + # Mark the EC as failed + self._state = ECState.FAILED + + # Wake up the EC in case it was sleeping + self._cond.acquire() + self._cond.notify() + self._cond.release() + +