X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fneco%2Fexecution%2Fec.py;h=ba064d490c3ed237217117eb9649772948162742;hb=6337302c0db631641b3e6a47c6e57c4864711acc;hp=c9e4e069941fa4928ddae7e2eaa247ef891a90be;hpb=b849b5d9bd2569b1db4dce2a65296e1c619bd0a7;p=nepi.git diff --git a/src/neco/execution/ec.py b/src/neco/execution/ec.py index c9e4e069..ba064d49 100644 --- a/src/neco/execution/ec.py +++ b/src/neco/execution/ec.py @@ -13,6 +13,12 @@ from neco.execution.scheduler import HeapScheduler, Task, TaskStatus from neco.execution.trace import TraceAttr # TODO: use multiprocessing instead of threading +# TODO: Improve speed. Too slow... !! + +class ECState(object): + RUNNING = 1 + FAILED = 2 + TERMINATED = 3 class ExperimentController(object): def __init__(self, exp_id = None, root_dir = "/tmp"): @@ -29,9 +35,6 @@ class ExperimentController(object): # Resource managers self._resources = dict() - # Resource managers - self._group = dict() - # Scheduler self._scheduler = HeapScheduler() @@ -39,11 +42,14 @@ class ExperimentController(object): self._tasks = dict() # Event processing thread - self._stop = False self._cond = threading.Condition() self._thread = threading.Thread(target = self._process) + self._thread.setDaemon(True) self._thread.start() + # EC state + self._state = ECState.RUNNING + # Logging self._logger = logging.getLogger("ExperimentController") @@ -51,6 +57,10 @@ class ExperimentController(object): def logger(self): return self._logger + @property + def ecstate(self): + return self._state + @property def exp_id(self): exp_id = self._exp_id @@ -58,6 +68,15 @@ class ExperimentController(object): exp_id = "nepi-" + exp_id return exp_id + @property + def finished(self): + return self.ecstate in [ECState.FAILED, ECState.TERMINATED] + + def wait_finished(self, guids): + while not all([self.state(guid) == ResourceState.FINISHED \ + for guid in guids]) and not self.finished: + time.sleep(1) + def get_task(self, tid): return self._tasks.get(tid) @@ -80,16 +99,6 @@ class ExperimentController(object): return guid - def register_group(self, group): - guid = self._guid_generator.next() - - if not isinstance(group, list): - group = [group] - - self._groups[guid] = group - - return guid - def get_attributes(self, guid): rm = self.get_resource(guid) return rm.get_attributes() @@ -278,10 +287,15 @@ class ExperimentController(object): thread = threading.Thread(target = steps, args = (rm,)) threads.append(thread) + thread.setDaemon(True) thread.start() - for thread in threads: - thread.join() + while list(threads) and not self.finished: + thread = threads[0] + # Time out after 5 seconds to check EC not terminated + thread.join(5) + if not thread.is_alive(): + threads.remove(thread) def release(self, group = None): if not group: @@ -292,18 +306,25 @@ class ExperimentController(object): rm = self.get_resource(guid) thread = threading.Thread(target=rm.release) threads.append(thread) + thread.setDaemon(True) thread.start() - for thread in threads: - thread.join() + while list(threads) and not self.finished: + thread = threads[0] + # Time out after 5 seconds to check EC not terminated + thread.join(5) + if not thread.is_alive(): + threads.remove(thread) + + self._state = ECState.TERMINATED def shutdown(self): self.release() - self._stop = True self._cond.acquire() self._cond.notify() self._cond.release() + if self._thread.is_alive(): self._thread.join() @@ -342,7 +363,7 @@ class ExperimentController(object): runner.start() try: - while not self._stop: + while not self.finished: self._cond.acquire() task = self._scheduler.next() self._cond.release() @@ -369,11 +390,19 @@ class ExperimentController(object): else: # Process tasks in parallel runner.put(self._execute, task) - except: + + except: import traceback err = traceback.format_exc() self._logger.error("Error while processing tasks in the EC: %s" % err) + self._state = ECState.FAILED + return + + # Mark EC state as terminated + if self.ecstate == ECState.RUNNING: + self._state = ECState.TERMINATED + def _execute(self, task): # Invoke callback task.status = TaskStatus.DONE @@ -383,8 +412,19 @@ class ExperimentController(object): except: import traceback err = traceback.format_exc() - self._logger.error("Error while executing event: %s" % err) - task.result = err task.status = TaskStatus.ERROR + + self._logger.error("Error occurred while executing task: %s" % err) + + # Mark the EC as failed + self._state = ECState.FAILED + + # Wake up the EC in case it was sleeping + self._cond.acquire() + self._cond.notify() + self._cond.release() + + # Propage error to the ParallelRunner + raise