X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fec.py;h=5ec341103b5af574a152c0017e2711ecc41acc66;hb=87f44a7c2853afb7021276dd3700858cff950703;hp=f2a19251acc8bd1a6d72f9efcfc47bc65a2e07c3;hpb=035281100ca10f829cdf17c16e50f1e13e011e2a;p=nepi.git diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index f2a19251..5ec34110 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -27,28 +27,94 @@ import threading from nepi.util import guid from nepi.util.parallel import ParallelRun -from nepi.util.timefuncs import strfnow, strfdiff, strfvalid +from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat from nepi.execution.resource import ResourceFactory, ResourceAction, \ ResourceState, ResourceState2str from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus from nepi.execution.trace import TraceAttr # TODO: use multiprocessing instead of threading -# TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!! +# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!! +# TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode) class ECState(object): + """ State of the Experiment Controller + + """ RUNNING = 1 FAILED = 2 TERMINATED = 3 class ExperimentController(object): - def __init__(self, exp_id = None, root_dir = "/tmp"): + """ + .. class:: Class Args : + + :param exp_id: Human readable identifier for the experiment scenario. + It will be used in the name of the directory + where experiment related information is stored + :type exp_id: str + + .. note:: + + An experiment, or scenario, is defined by a concrete use, behavior, + configuration and interconnection of resources that describe a single + experiment case (We call this the experiment description). + A same experiment (scenario) can be run many times. + + The ExperimentController (EC), is the entity responsible for + managing an experiment instance (run). The same scenario can be + recreated (and re-run) by instantiating an EC and recreating + the same experiment description. + + In NEPI, an experiment is represented as a graph of interconnected + resources. A resource is a generic concept in the sense that any + component taking part of an experiment, whether physical of + virtual, is considered a resource. A resources could be a host, + a virtual machine, an application, a simulator, a IP address. + + A ResourceManager (RM), is the entity responsible for managing a + single resource. ResourceManagers are specific to a resource + type (i.e. An RM to control a Linux application will not be + the same as the RM used to control a ns-3 simulation). + In order for a new type of resource to be supported in NEPI + a new RM must be implemented. NEPI already provides different + RMs to control basic resources, and new can be extended from + the existing ones. + + Through the EC interface the user can create ResourceManagers (RMs), + configure them and interconnect them, in order to describe an experiment. + Describing an experiment through the EC does not run the experiment. + Only when the 'deploy()' method is invoked on the EC, will the EC take + actions to transform the 'described' experiment into a 'running' experiment. + + While the experiment is running, it is possible to continue to + create/configure/connect RMs, and to deploy them to involve new + resources in the experiment (this is known as 'interactive' deployment). + + An experiments in NEPI is identified by a string id, + which is either given by the user, or automatically generated by NEPI. + The purpose of this identifier is to separate files and results that + belong to different experiment scenarios. + However, since a same 'experiment' can be run many times, the experiment + id is not enough to identify an experiment instance (run). + For this reason, the ExperimentController has two identifier, the + exp_id, which can be re-used by different ExperimentController instances, + and the run_id, which unique to a ExperimentController instance, and + is automatically generated by NEPI. + + """ + + def __init__(self, exp_id = None): super(ExperimentController, self).__init__() # root directory to store files - self._root_dir = root_dir - # experiment identifier given by the user - self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex') + # Run identifier. It identifies a concrete instance (run) of an experiment. + # Since a same experiment (same configuration) can be run many times, + # this id permits to identify concrete exoeriment run + self._run_id = tsformat() + + # Experiment identifier. Usually assigned by the user + self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex') # generator of globally unique ids self._guid_generator = guid.GuidGenerator() @@ -76,44 +142,120 @@ class ExperimentController(object): @property def logger(self): + """ Return the logger of the Experiment Controller + + """ return self._logger @property def ecstate(self): + """ Return the state of the Experiment Controller + + """ return self._state @property def exp_id(self): - exp_id = self._exp_id - if not exp_id.startswith("nepi-"): - exp_id = "nepi-" + exp_id - return exp_id + """ Return the experiment id assigned by the user + + """ + return self._exp_id + + @property + def run_id(self): + """ Return the experiment instance (run) identifier + + """ + return self._run_id @property def finished(self): + """ Put the state of the Experiment Controller into a final state : + Either TERMINATED or FAILED + + """ return self.ecstate in [ECState.FAILED, ECState.TERMINATED] def wait_finished(self, guids): - # Take into account if only one guids is given in parameter - while not all([self.state(guid) in [ResourceState.FINISHED, - ResourceState.STOPPED, - ResourceState.FAILED] \ - for guid in guids]) and not self.finished: - # We keep the sleep as large as possible to - # decrese the number of RM state requests + """ Blocking method that wait until all the RM from the 'guid' list + reached the state FINISHED + + :param guids: List of guids + :type guids: list + """ + return self.wait(guids) + + def wait_started(self, guids): + """ Blocking method that wait until all the RM from the 'guid' list + reached the state STARTED + + :param guids: List of guids + :type guids: list + """ + return self.wait(guids, states = [ResourceState.STARTED, + ResourceState.STOPPED, + ResourceState.FINISHED]) + + def wait(self, guids, states = [ResourceState.FINISHED, + ResourceState.STOPPED]): + """ Blocking method that waits until all the RM from the 'guid' list + reached state 'state' or until a failure occurs + + :param guids: List of guids + :type guids: list + """ + if isinstance(guids, int): + guids = [guids] + + while not all([self.state(guid) in states for guid in guids]) and \ + not any([self.state(guid) in [ + ResourceState.FAILED] for guid in guids]) and \ + not self.finished: + # debug logging + waited = "" + for guid in guids: + waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True)) + self.logger.debug(" WAITING FOR %s " % waited ) + + # We keep the sleep big to decrease the number of RM state queries time.sleep(2) - + def get_task(self, tid): + """ Get a specific task + + :param tid: Id of the task + :type tid: int + :rtype: Task + """ return self._tasks.get(tid) def get_resource(self, guid): + """ Get a specific Resource Manager + + :param guid: Id of the task + :type guid: int + :rtype: ResourceManager + """ return self._resources.get(guid) @property def resources(self): + """ Returns the list of all the Resource Manager Id + + :rtype: set + + """ return self._resources.keys() def register_resource(self, rtype, guid = None): + """ Register a Resource Manager. It creates a new 'guid', if it is not specified, + for the RM of type 'rtype' and add it to the list of Resources. + + :param rtype: Type of the RM + :type rtype: str + :return: Id of the RM + :rtype: int + """ # Get next available guid guid = self._guid_generator.next(guid) @@ -126,15 +268,31 @@ class ExperimentController(object): return guid def get_attributes(self, guid): + """ Return all the attibutes of a specific RM + + :param guid: Guid of the RM + :type guid: int + :return: List of attributes + :rtype: list + """ rm = self.get_resource(guid) return rm.get_attributes() def register_connection(self, guid1, guid2): + """ Registers a guid1 with a guid2. + The declaration order is not important + + :param guid1: First guid to connect + :type guid1: ResourceManager + + :param guid2: Second guid to connect + :type guid: ResourceManager + """ rm1 = self.get_resource(guid1) rm2 = self.get_resource(guid2) - rm1.connect(guid2) - rm2.connect(guid1) + rm1.register_connection(guid2) + rm2.register_connection(guid1) def register_condition(self, group1, action, group2, state, time = None): @@ -200,18 +358,51 @@ class ExperimentController(object): return rm.trace(name, attr, block, offset) def discover(self, guid): + """ Discover a specific RM defined by its 'guid' + + :param guid: Guid of the RM + :type guid: int + + """ rm = self.get_resource(guid) return rm.discover() def provision(self, guid): + """ Provision a specific RM defined by its 'guid' + + :param guid: Guid of the RM + :type guid: int + + """ rm = self.get_resource(guid) return rm.provision() def get(self, guid, name): + """ Get a specific attribute 'name' from the RM 'guid' + + :param guid: Guid of the RM + :type guid: int + + :param name: attribute's name + :type name: str + + """ rm = self.get_resource(guid) return rm.get(name) def set(self, guid, name, value): + """ Set a specific attribute 'name' from the RM 'guid' + with the value 'value' + + :param guid: Guid of the RM + :type guid: int + + :param name: attribute's name + :type name: str + + :param value: attribute's value + + """ rm = self.get_resource(guid) return rm.set(name, value) @@ -227,16 +418,30 @@ class ExperimentController(object): """ rm = self.get_resource(guid) + state = rm.state + if hr: - return ResourceState2str.get(rm.state) + return ResourceState2str.get(state) - return rm.state + return state def stop(self, guid): + """ Stop a specific RM defined by its 'guid' + + :param guid: Guid of the RM + :type guid: int + + """ rm = self.get_resource(guid) return rm.stop() def start(self, guid): + """ Start a specific RM defined by its 'guid' + + :param guid: Guid of the RM + :type guid: int + + """ rm = self.get_resource(guid) return rm.start() @@ -278,10 +483,22 @@ class ExperimentController(object): rm.set_with_conditions(name, value, group2, state, time) def stop_with_conditions(self, guid): + """ Stop a specific RM defined by its 'guid' only if all the conditions are true + + :param guid: Guid of the RM + :type guid: int + + """ rm = self.get_resource(guid) return rm.stop_with_conditions() def start_with_conditions(self, guid): + """ Start a specific RM defined by its 'guid' only if all the conditions are true + + :param guid: Guid of the RM + :type guid: int + + """ rm = self.get_resource(guid) return rm.start_with_condition() @@ -299,7 +516,15 @@ class ExperimentController(object): self.logger.debug(" ------- DEPLOY START ------ ") if not group: - group = self.resources + # By default, if not deployment group is indicated, + # all RMs that are undeployed will be deployed + group = [] + for guid in self.resources: + if self.state(guid) == ResourceState.NEW: + group.append(guid) + + if isinstance(group, int): + group = [group] # Before starting deployment we disorder the group list with the # purpose of speeding up the whole deployment process. @@ -317,8 +542,7 @@ class ExperimentController(object): def wait_all_and_start(group): reschedule = False for guid in group: - rm = self.get_resource(guid) - if rm.state < ResourceState.READY: + if self.state(guid) < ResourceState.READY: reschedule = True break @@ -329,7 +553,7 @@ class ExperimentController(object): # If all resources are read, we schedule the start for guid in group: rm = self.get_resource(guid) - self.schedule("0.01s", rm.start_with_conditions) + self.schedule("0s", rm.start_with_conditions) if wait_all_ready: # Schedule the function that will check all resources are @@ -341,7 +565,7 @@ class ExperimentController(object): for guid in group: rm = self.get_resource(guid) - self.schedule("0.001s", rm.deploy) + self.schedule("0s", rm.deploy) if not wait_all_ready: self.schedule("1s", rm.start_with_conditions) @@ -351,8 +575,14 @@ class ExperimentController(object): # schedule a stop. Otherwise the RM will stop immediately self.schedule("2s", rm.stop_with_conditions) - def release(self, group = None): + """ Release the elements of the list 'group' or + all the resources if any group is specified + + :param group: List of RM + :type group: list + + """ if not group: group = self.resources @@ -372,9 +602,17 @@ class ExperimentController(object): threads.remove(thread) def shutdown(self): + """ Shutdown the Experiment Controller. + Releases all the resources and stops task processing thread + + """ self.release() - self._stop_scheduler() + # Mark the EC state as TERMINATED + self._state = ECState.TERMINATED + + # Notify condition to wake up the processing thread + self._notify() if self._thread.is_alive(): self._thread.join() @@ -382,79 +620,127 @@ class ExperimentController(object): def schedule(self, date, callback, track = False): """ Schedule a callback to be executed at time date. - date string containing execution time for the task. + :param date: string containing execution time for the task. It can be expressed as an absolute time, using timestamp format, or as a relative time matching ^\d+.\d+(h|m|s|ms|us)$ - callback code to be executed for the task. Must be a + :param callback: code to be executed for the task. Must be a Python function, and receives args and kwargs as arguments. - track if set to True, the task will be retrivable with + :param track: if set to True, the task will be retrivable with the get_task() method + + :return : The Id of the task """ - timestamp = strfvalid(date) - + timestamp = stabsformat(date) task = Task(timestamp, callback) task = self._scheduler.schedule(task) if track: self._tasks[task.id] = task - + # Notify condition to wake up the processing thread - self._cond.acquire() - self._cond.notify() - self._cond.release() + self._notify() return task.id def _process(self): - runner = ParallelRun(maxthreads = 50) + """ Process scheduled tasks. + + .. note:: + + The _process method is executed in an independent thread held by the + ExperimentController for as long as the experiment is running. + + Tasks are scheduled by invoking the schedule method with a target callback. + The schedule method is given a execution time which controls the + order in which tasks are processed. + + Tasks are processed in parallel using multithreading. + The environmental variable NEPI_NTHREADS can be used to control + the number of threads used to process tasks. The default value is 50. + + Exception handling: + + To execute tasks in parallel, an ParallelRunner (PR) object, holding + a pool of threads (workers), is used. + For each available thread in the PR, the next task popped from + the scheduler queue is 'put' in the PR. + Upon receiving a task to execute, each PR worker (thread) invokes the + _execute method of the EC, passing the task as argument. + This method, calls task.callback inside a try/except block. If an + exception is raised by the tasks.callback, it will be trapped by the + try block, logged to standard error (usually the console), and the EC + state will be set to ECState.FAILED. + The invocation of _notify immediately after, forces the processing + loop in the _process method, to wake up if it was blocked waiting for new + tasks to arrived, and to check the EC state. + As the EC is in FAILED state, the processing loop exits and the + 'finally' block is invoked. In the 'finally' block, the 'sync' method + of the PR is invoked, which forces the PR to raise any unchecked errors + that might have been raised by the workers. + + """ + nthreads = int(os.environ.get("NEPI_NTHREADS", "50")) + + runner = ParallelRun(maxthreads = nthreads) runner.start() try: while not self.finished: self._cond.acquire() + task = self._scheduler.next() - self._cond.release() if not task: - # It there are not tasks in the tasks queue we need to - # wait until a call to schedule wakes us up - self._cond.acquire() + # No task to execute. Wait for a new task to be scheduled. self._cond.wait() - self._cond.release() - else: - # If the task timestamp is in the future the thread needs to wait - # until time elapse or until another task is scheduled - now = strfnow() + else: + # The task timestamp is in the future. Wait for timeout + # or until another task is scheduled. + now = tnow() if now < task.timestamp: - # Calculate time difference in seconds - timeout = strfdiff(task.timestamp, now) + # Calculate timeout in seconds + timeout = tdiffsec(task.timestamp, now) + # Re-schedule task with the same timestamp self._scheduler.schedule(task) - # Sleep until timeout or until a new task awakes the condition - self._cond.acquire() + + task = None + + # Wait timeout or until a new task awakes the condition self._cond.wait(timeout) - self._cond.release() - else: - # Process tasks in parallel - runner.put(self._execute, task) + + self._cond.release() + + if task: + # Process tasks in parallel + runner.put(self._execute, task) except: import traceback err = traceback.format_exc() - self._logger.error("Error while processing tasks in the EC: %s" % err) + self.logger.error("Error while processing tasks in the EC: %s" % err) self._state = ECState.FAILED - - # Mark EC state as terminated - if self.ecstate == ECState.RUNNING: - # Synchronize to get errors if occurred + finally: + self.logger.debug("Exiting the task processing loop ... ") runner.sync() - self._state = ECState.TERMINATED def _execute(self, task): + """ Executes a single task. + + :param task: Object containing the callback to execute + :type task: Task + + .. note:: + + If the invokation of the task callback raises an + exception, the processing thread of the ExperimentController + will be stopped and the experiment will be aborted. + + """ # Invoke callback task.status = TaskStatus.DONE @@ -466,20 +752,23 @@ class ExperimentController(object): task.result = err task.status = TaskStatus.ERROR - self._logger.error("Error occurred while executing task: %s" % err) + self.logger.error("Error occurred while executing task: %s" % err) + + # Set the EC to FAILED state (this will force to exit the task + # processing thread) + self._state = ECState.FAILED - self._stop_scheduler() + # Notify condition to wake up the processing thread + self._notify() # Propage error to the ParallelRunner raise - def _stop_scheduler(self): - # Mark the EC as failed - self._state = ECState.FAILED - - # Wake up the EC in case it was sleeping + def _notify(self): + """ Awakes the processing thread in case it is blocked waiting + for a new task to be scheduled. + """ self._cond.acquire() self._cond.notify() self._cond.release() -