import logging
import os
+import random
import sys
import time
import threading
from neco.util import guid
+from neco.util.parallel import ParallelRun
from neco.util.timefuncs import strfnow, strfdiff, strfvalid
from neco.execution.resource import ResourceFactory, ResourceAction, \
ResourceState
from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
-from neco.util.parallel import ParallelRun
+from neco.execution.trace import TraceAttr
# TODO: use multiprocessing instead of threading
+# TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
+
+class ECState(object):
+ RUNNING = 1
+ FAILED = 2
+ TERMINATED = 3
class ExperimentController(object):
- def __init__(self, root_dir = "/tmp", loglevel = 'error'):
+ def __init__(self, exp_id = None, root_dir = "/tmp"):
super(ExperimentController, self).__init__()
# root directory to store files
self._root_dir = root_dir
+ # experiment identifier given by the user
+ self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
+
# generator of globally unique ids
self._guid_generator = guid.GuidGenerator()
# Resource managers
self._resources = dict()
- # Resource managers
- self._group = dict()
-
# Scheduler
self._scheduler = HeapScheduler()
self._tasks = dict()
# Event processing thread
- self._stop = False
self._cond = threading.Condition()
self._thread = threading.Thread(target = self._process)
+ self._thread.setDaemon(True)
self._thread.start()
+ # EC state
+ self._state = ECState.RUNNING
+
# Logging
- self._logger = logging.getLogger("neco.execution.ec")
- self._logger.setLevel(getattr(logging, loglevel.upper()))
+ self._logger = logging.getLogger("ExperimentController")
+ @property
+ def logger(self):
+ return self._logger
+
+ @property
+ def ecstate(self):
+ return self._state
+
+ @property
+ def exp_id(self):
+ exp_id = self._exp_id
+ if not exp_id.startswith("nepi-"):
+ exp_id = "nepi-" + exp_id
+ return exp_id
+
+ @property
+ def finished(self):
+ return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+
+ def wait_finished(self, guids):
+ while not all([self.state(guid) == ResourceState.FINISHED \
+ for guid in guids]) and not self.finished:
+ # We keep the sleep as large as possible to
+ # decrese the number of RM state requests
+ time.sleep(2)
+
def get_task(self, tid):
return self._tasks.get(tid)
return guid
- def create_group(self, *args):
- guid = self._guid_generator.next(guid)
-
- grp = [arg for arg in args]
-
- self._resources[guid] = grp
-
- return guid
-
-
def get_attributes(self, guid):
rm = self.get_resource(guid)
return rm.get_attributes()
rm = self.get_resource(guid1)
rm.register_condition(action, group2, state, time)
+ def register_trace(self, guid, name):
+ """ Enable trace
+
+ :param name: Name of the trace
+ :type name: str
+ """
+ rm = self.get_resource(guid)
+ rm.register_trace(name)
+
+ def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+ """ Get information on collected trace
+
+ :param name: Name of the trace
+ :type name: str
+
+ :param attr: Can be one of:
+ - TraceAttr.ALL (complete trace content),
+ - TraceAttr.STREAM (block in bytes to read starting at offset),
+ - TraceAttr.PATH (full path to the trace file),
+ - TraceAttr.SIZE (size of trace file).
+ :type attr: str
+
+ :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
+ :type name: int
+
+ :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
+ :type name: int
+
+ :rtype: str
+ """
+ rm = self.get_resource(guid)
+ return rm.trace(name, attr, block, offset)
+
def discover(self, guid, filters):
rm = self.get_resource(guid)
return rm.discover(filters)
rm = self.get_resource(guid)
return rm.start_with_condition()
- def deploy(self, group = None, wait_all_deployed = True):
+ def deploy(self, group = None, wait_all_ready = True):
""" Deploy all resource manager in group
:param group: List of guids of RMs to deploy
:type group: list
- :param wait_all_deployed: Wait until all RMs are deployed in
+ :param wait_all_ready: Wait until all RMs are ready in
order to start the RMs
:type guid: int
"""
- def steps(rm):
- rm.deploy()
- rm.start_with_conditions()
+ self.logger.debug(" ------- DEPLOY START ------ ")
- # Only if the RM has STOP consitions we
- # schedule a stop. Otherwise the RM will stop immediately
- if rm.conditions.get(ResourceAction.STOP):
- rm.stop_with_conditions()
+ stop = []
+
+ def steps(rm):
+ try:
+ rm.deploy()
+ rm.start_with_conditions()
+
+ # Only if the RM has STOP conditions we
+ # schedule a stop. Otherwise the RM will stop immediately
+ if rm.conditions.get(ResourceAction.STOP):
+ rm.stop_with_conditions()
+ except:
+ import traceback
+ err = traceback.format_exc()
+
+ self._logger.error("Error occurred while deploying resources: %s" % err)
+
+ # stop deployment
+ stop.append(None)
if not group:
group = self.resources
+ # Before starting deployment we disorder the group list with the
+ # purpose of speeding up the whole deployment process.
+ # It is likely that the user inserted in the 'group' list closely
+ # resources resources one after another (e.g. all applications
+ # connected to the same node can likely appear one after another).
+ # This can originate a slow down in the deployment since the N
+ # threads the parallel runner uses to processes tasks may all
+ # be taken up by the same family of resources waiting for the
+ # same conditions.
+ # If we disorder the group list, this problem can be mitigated
+ random.shuffle(group)
+
threads = []
for guid in group:
rm = self.get_resource(guid)
- if wait_all_deployed:
+ if wait_all_ready:
towait = list(group)
towait.remove(guid)
self.register_condition(guid, ResourceAction.START,
- towait, ResourceState.DEPLOYED)
+ towait, ResourceState.READY)
thread = threading.Thread(target = steps, args = (rm,))
threads.append(thread)
+ thread.setDaemon(True)
thread.start()
- for thread in threads:
- thread.join()
+ while list(threads) and not self.finished and not stop:
+ thread = threads[0]
+ # Time out after 5 seconds to check EC not terminated
+ thread.join(1)
+ if not thread.is_alive():
+ threads.remove(thread)
+
+ if stop:
+ # stop the scheduler
+ self._stop_scheduler()
+
+ if self._thread.is_alive():
+ self._thread.join()
+
+ raise RuntimeError, "Error occurred, interrupting deployment "
def release(self, group = None):
if not group:
rm = self.get_resource(guid)
thread = threading.Thread(target=rm.release)
threads.append(thread)
+ thread.setDaemon(True)
thread.start()
- for thread in threads:
- thread.join()
+ while list(threads) and not self.finished:
+ thread = threads[0]
+ # Time out after 5 seconds to check EC not terminated
+ thread.join(5)
+ if not thread.is_alive():
+ threads.remove(thread)
def shutdown(self):
self.release()
+
+ self._stop_scheduler()
- self._stop = True
- self._cond.acquire()
- self._cond.notify()
- self._cond.release()
if self._thread.is_alive():
self._thread.join()
runner.start()
try:
- while not self._stop:
+ while not self.finished:
self._cond.acquire()
task = self._scheduler.next()
self._cond.release()
else:
# Process tasks in parallel
runner.put(self._execute, task)
- except:
+
+ except:
import traceback
err = traceback.format_exc()
self._logger.error("Error while processing tasks in the EC: %s" % err)
+ self._state = ECState.FAILED
+ finally:
+ runner.sync()
+
+ # Mark EC state as terminated
+ if self.ecstate == ECState.RUNNING:
+ self._state = ECState.TERMINATED
+
def _execute(self, task):
# Invoke callback
task.status = TaskStatus.DONE
except:
import traceback
err = traceback.format_exc()
- self._logger.error("Error while executing event: %s" % err)
-
task.result = err
task.status = TaskStatus.ERROR
+
+ self._logger.error("Error occurred while executing task: %s" % err)
+
+ self._stop_scheduler()
+
+ # Propage error to the ParallelRunner
+ raise
+
+ def _stop_scheduler(self):
+ # Mark the EC as failed
+ self._state = ECState.FAILED
+
+ # Wake up the EC in case it was sleeping
+ self._cond.acquire()
+ self._cond.notify()
+ self._cond.release()
+