from neco.execution.trace import TraceAttr
# TODO: use multiprocessing instead of threading
+# TODO: Improve speed. Too slow... !!
+
+class ECState(object):
+ RUNNING = 1
+ FAILED = 2
+ TERMINATED = 3
class ExperimentController(object):
def __init__(self, exp_id = None, root_dir = "/tmp"):
# Resource managers
self._resources = dict()
- # Resource managers
- self._group = dict()
-
# Scheduler
self._scheduler = HeapScheduler()
self._tasks = dict()
# Event processing thread
- self._stop = False
self._cond = threading.Condition()
self._thread = threading.Thread(target = self._process)
+ self._thread.setDaemon(True)
self._thread.start()
+ # EC state
+ self._state = ECState.RUNNING
+
# Logging
- self._logger = logging.getLogger("neco.execution.ec")
+ self._logger = logging.getLogger("ExperimentController")
@property
def logger(self):
return self._logger
+ @property
+ def ecstate(self):
+ return self._state
+
@property
def exp_id(self):
exp_id = self._exp_id
exp_id = "nepi-" + exp_id
return exp_id
+ @property
+ def finished(self):
+ return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+
+ def wait_finished(self, guids):
+ while not all([self.state(guid) == ResourceState.FINISHED \
+ for guid in guids]) and not self.finished:
+ # We keep the sleep as large as possible to
+ # decrese the number of RM state requests
+ time.sleep(2)
+
def get_task(self, tid):
return self._tasks.get(tid)
return guid
- def register_group(self, group):
- guid = self._guid_generator.next()
-
- if not isinstance(group, list):
- group = [group]
-
- self._groups[guid] = group
-
- return guid
-
def get_attributes(self, guid):
rm = self.get_resource(guid)
return rm.get_attributes()
thread = threading.Thread(target = steps, args = (rm,))
threads.append(thread)
+ thread.setDaemon(True)
thread.start()
- for thread in threads:
- thread.join()
+ while list(threads) and not self.finished:
+ thread = threads[0]
+ # Time out after 5 seconds to check EC not terminated
+ thread.join(5)
+ if not thread.is_alive():
+ threads.remove(thread)
def release(self, group = None):
if not group:
rm = self.get_resource(guid)
thread = threading.Thread(target=rm.release)
threads.append(thread)
+ thread.setDaemon(True)
thread.start()
- for thread in threads:
- thread.join()
+ while list(threads) and not self.finished:
+ thread = threads[0]
+ # Time out after 5 seconds to check EC not terminated
+ thread.join(5)
+ if not thread.is_alive():
+ threads.remove(thread)
+
+ self._state = ECState.TERMINATED
def shutdown(self):
self.release()
- self._stop = True
self._cond.acquire()
self._cond.notify()
self._cond.release()
+
if self._thread.is_alive():
self._thread.join()
runner.start()
try:
- while not self._stop:
+ while not self.finished:
self._cond.acquire()
task = self._scheduler.next()
self._cond.release()
else:
# Process tasks in parallel
runner.put(self._execute, task)
- except:
+
+ except:
import traceback
err = traceback.format_exc()
self._logger.error("Error while processing tasks in the EC: %s" % err)
+ self._state = ECState.FAILED
+ return
+
+ # Mark EC state as terminated
+ if self.ecstate == ECState.RUNNING:
+ self._state = ECState.TERMINATED
+
def _execute(self, task):
# Invoke callback
task.status = TaskStatus.DONE
except:
import traceback
err = traceback.format_exc()
- self._logger.error("Error while executing event: %s" % err)
-
task.result = err
task.status = TaskStatus.ERROR
+
+ self._logger.error("Error occurred while executing task: %s" % err)
+
+ # Mark the EC as failed
+ self._state = ECState.FAILED
+
+ # Wake up the EC in case it was sleeping
+ self._cond.acquire()
+ self._cond.notify()
+ self._cond.release()
+
+ # Propage error to the ParallelRunner
+ raise