X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fec.py;h=2e573dd91f0da821b7fb6df0ec4c974e973bc07f;hb=cb816db417dff4d0f985455c1d7cbd261fd40f9b;hp=7d8b990bfc8f7ae3ee49f69a1371ce67f0fc9420;hpb=2efd5eabeba8a6577ace9132d6336d44be0510e8;p=nepi.git diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 7d8b990b..2e573dd9 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -24,6 +24,9 @@ from nepi.execution.resource import ResourceFactory, ResourceAction, \ ResourceState, ResourceState2str from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus from nepi.execution.trace import TraceAttr +from nepi.util.serializer import ECSerializer, SFormats +from nepi.util.plotter import ECPlotter, PFormats +from nepi.util.netgraph import NetGraph, TopologyType # TODO: use multiprocessing instead of threading # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode) @@ -32,6 +35,7 @@ import functools import logging import os import sys +import tempfile import time import threading import weakref @@ -51,6 +55,7 @@ class FailureManager(object): def __init__(self, ec): self._ec = weakref.ref(ec) self._failure_level = FailureLevel.OK + self._abort = False @property def ec(self): @@ -62,22 +67,23 @@ class FailureManager(object): @property def abort(self): + return self._abort + + def eval_failure(self, guid): if self._failure_level == FailureLevel.OK: - for guid in self.ec.resources: - state = self.ec.state(guid) - critical = self.ec.get(guid, "critical") - if state == ResourceState.FAILED and critical: - self._failure_level = FailureLevel.RM_FAILURE - self.ec.logger.debug("RM critical failure occurred on guid %d." \ - " Setting EC FAILURE LEVEL to RM_FAILURE" % guid) - break + rm = self.ec.get_resource(guid) + state = rm.state + critical = rm.get("critical") - return self._failure_level != FailureLevel.OK + if state == ResourceState.FAILED and critical: + self._failure_level = FailureLevel.RM_FAILURE + self._abort = True + self.ec.logger.debug("RM critical failure occurred on guid %d." \ + " Setting EC FAILURE LEVEL to RM_FAILURE" % guid) def set_ec_failure(self): self._failure_level = FailureLevel.EC_FAILURE - class ECState(object): """ Possible states for an ExperimentController @@ -96,7 +102,7 @@ class ExperimentController(object): .. note:: An experiment, or scenario, is defined by a concrete set of resources, - behavior, configuration and interconnection of those resources. + and the behavior, configuration and interconnection of those resources. The Experiment Description (ED) is a detailed representation of a single experiment. It contains all the necessary information to allow repeating the experiment. NEPI allows to describe @@ -111,7 +117,7 @@ class ExperimentController(object): recreated (and re-run) by instantiating an EC and recreating the same experiment description. - In NEPI, an experiment is represented as a graph of interconnected + An experiment is represented as a graph of interconnected resources. A resource is a generic concept in the sense that any component taking part of an experiment, whether physical of virtual, is considered a resource. A resources could be a host, @@ -121,10 +127,9 @@ class ExperimentController(object): single resource. ResourceManagers are specific to a resource type (i.e. An RM to control a Linux application will not be the same as the RM used to control a ns-3 simulation). - To support a new type of resource in NEPI, a new RM must be - implemented. NEPI already provides a variety of - RMs to control basic resources, and new can be extended from - the existing ones. + To support a new type of resource, a new RM must be implemented. + NEPI already provides a variety of RMs to control basic resources, + and new can be extended from the existing ones. Through the EC interface the user can create ResourceManagers (RMs), configure them and interconnect them, to describe an experiment. @@ -146,10 +151,40 @@ class ExperimentController(object): exp_id, which can be re-used in different ExperimentController, and the run_id, which is unique to one ExperimentController instance, and is automatically generated by NEPI. - + """ - def __init__(self, exp_id = None): + @classmethod + def load(cls, filepath, format = SFormats.XML): + serializer = ECSerializer() + ec = serializer.load(filepath) + return ec + + def __init__(self, exp_id = None, local_dir = None, persist = False, + add_node_callback = None, add_edge_callback = None, **kwargs): + """ ExperimentController entity to model an execute a network + experiment. + + :param exp_id: Human readable name to identify the experiment + :type exp_id: str + + :param local_dir: Path to local directory where to store experiment + related files + :type local_dir: str + + :param persist: Save an XML description of the experiment after + completion at local_dir + :type persist: bool + + :param add_node_callback: Callback to invoke for node instantiation + when automatic topology creation mode is used + :type add_node_callback: function + + :param add_edge_callback: Callback to invoke for edge instantiation + when automatic topology creation mode is used + :type add_edge_callback: function + + """ super(ExperimentController, self).__init__() # Logging @@ -167,6 +202,17 @@ class ExperimentController(object): # resources used, etc) self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex') + # Local path where to store experiment related files (results, etc) + if not local_dir: + local_dir = tempfile.gettempdir() # /tmp + + self._local_dir = local_dir + self._exp_dir = os.path.join(local_dir, self.exp_id) + self._run_dir = os.path.join(self.exp_dir, self.run_id) + + # If True persist the experiment controller in XML format, after completion + self._persist = persist + # generator of globally unique ids self._guid_generator = guid.GuidGenerator() @@ -196,17 +242,23 @@ class ExperimentController(object): # EC state self._state = ECState.RUNNING + # Automatically construct experiment description + self._netgraph = None + if add_node_callback or add_edge_callback or kwargs.get("topology"): + self._build_from_netgraph(add_node_callback, add_edge_callback, + **kwargs) + # The runner is a pool of threads used to parallelize # execution of tasks - nthreads = int(os.environ.get("NEPI_NTHREADS", "50")) - self._runner = ParallelRun(maxthreads = nthreads) + self._nthreads = 20 + self._runner = None # Event processing thread self._cond = threading.Condition() self._thread = threading.Thread(target = self._process) self._thread.setDaemon(True) self._thread.start() - + @property def logger(self): """ Returns the logger instance of the Experiment Controller @@ -214,6 +266,14 @@ class ExperimentController(object): """ return self._logger + @property + def failure_level(self): + """ Returns the level of FAILURE of th experiment + + """ + + return self._fm._failure_level + @property def ecstate(self): """ Returns the state of the Experiment Controller @@ -236,6 +296,52 @@ class ExperimentController(object): """ return self._run_id + @property + def nthreads(self): + """ Returns the number of processing nthreads used + + """ + return self._nthreads + + @property + def local_dir(self): + """ Root local directory for experiment files + + """ + return self._local_dir + + @property + def exp_dir(self): + """ Local directory to store results and other files related to the + experiment. + + """ + return self._exp_dir + + @property + def run_dir(self): + """ Local directory to store results and other files related to the + experiment run. + + """ + return self._run_dir + + @property + def persist(self): + """ If True, persists the ExperimentController to XML format upon + experiment completion + + """ + return self._persist + + @property + def netgraph(self): + """ Return NetGraph instance if experiment description was automatically + generated + + """ + return self._netgraph + @property def abort(self): """ Returns True if the experiment has failed and should be interrupted, @@ -244,6 +350,16 @@ class ExperimentController(object): """ return self._fm.abort + def inform_failure(self, guid): + """ Reports a failure in a RM to the EC for evaluation + + :param guid: Resource id + :type guid: int + + """ + + return self._fm.eval_failure(guid) + def wait_finished(self, guids): """ Blocking method that waits until all RMs in the 'guids' list have reached a state >= STOPPED (i.e. STOPPED, FAILED or @@ -317,7 +433,6 @@ class ExperimentController(object): :type guids: list """ - if isinstance(guids, int): guids = [guids] @@ -331,23 +446,46 @@ class ExperimentController(object): break # If a guid reached one of the target states, remove it from list - guid = guids[0] - rstate = self.state(guid) + guid = guids.pop() + rm = self.get_resource(guid) + rstate = rm.state - hrrstate = ResourceState2str.get(rstate) - hrstate = ResourceState2str.get(state) - if rstate >= state: - guids.remove(guid) - rm = self.get_resource(guid) self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % ( - rm.get_rtype(), guid, hrrstate, hrstate)) + rm.get_rtype(), guid, rstate, state)) else: # Debug... self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % ( - guid, hrrstate, hrstate)) + guid, rstate, state)) + + guids.append(guid) + time.sleep(0.5) - + + def plot(self, dirpath = None, format= PFormats.FIGURE, show = False): + plotter = ECPlotter() + fpath = plotter.plot(self, dirpath = dirpath, format= format, + show = show) + return fpath + + def serialize(self, format = SFormats.XML): + serializer = ECSerializer() + sec = serializer.load(self, format = format) + return sec + + def save(self, dirpath = None, format = SFormats.XML): + if dirpath == None: + dirpath = self.run_dir + + try: + os.makedirs(dirpath) + except OSError: + pass + + serializer = ECSerializer() + path = serializer.save(self, dirpath, format = format) + return path + def get_task(self, tid): """ Returns a task by its id @@ -362,23 +500,59 @@ class ExperimentController(object): def get_resource(self, guid): """ Returns a registered ResourceManager by its guid - :param guid: Id of the task + :param guid: Id of the resource :type guid: int :rtype: ResourceManager """ - return self._resources.get(guid) + rm = self._resources.get(guid) + return rm + + def get_resources_by_type(self, rtype): + """ Returns the ResourceManager objects of type rtype + + :param rtype: Resource type + :type rtype: string + + :rtype: list of ResourceManagers + + """ + rms = [] + for guid, rm in self._resources.iteritems(): + if rm.get_rtype() == rtype: + rms.append(rm) + return rms + + def remove_resource(self, guid): + del self._resources[guid] @property def resources(self): - """ Returns the set() of guids of all the ResourceManager + """ Returns the guids of all ResourceManagers :return: Set of all RM guids - :rtype: set + :rtype: list """ - return self._resources.keys() + keys = self._resources.keys() + + return keys + + def filter_resources(self, rtype): + """ Returns the guids of all ResourceManagers of type rtype + + :param rtype: Resource type + :type rtype: string + + :rtype: list of guids + + """ + rms = [] + for guid, rm in self._resources.iteritems(): + if rm.get_rtype() == rtype: + rms.append(rm.guid) + return rms def register_resource(self, rtype, guid = None): """ Registers a new ResourceManager of type 'rtype' in the experiment @@ -607,7 +781,39 @@ class ExperimentController(object): """ rm = self.get_resource(guid) - return rm.set(name, value) + rm.set(name, value) + + def get_global(self, rtype, name): + """ Returns the value of the global attribute with name 'name' on the + RMs of rtype 'rtype'. + + :param guid: Guid of the RM + :type guid: int + + :param name: Name of the attribute + :type name: str + + :return: The value of the attribute with name 'name' + + """ + rclass = ResourceFactory.get_resource_type(rtype) + return rclass.get_global(name) + + def set_global(self, rtype, name, value): + """ Modifies the value of the global attribute with name 'name' on the + RMs of with rtype 'rtype'. + + :param guid: Guid of the RM + :type guid: int + + :param name: Name of the attribute + :type name: str + + :param value: Value of the attribute + + """ + rclass = ResourceFactory.get_resource_type(rtype) + return rclass.set_global(name, value) def state(self, guid, hr = False): """ Returns the state of a resource @@ -654,6 +860,41 @@ class ExperimentController(object): rm = self.get_resource(guid) return rm.start() + def get_start_time(self, guid): + """ Returns the start time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.start_time + + def get_stop_time(self, guid): + """ Returns the stop time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.stop_time + + def get_discover_time(self, guid): + """ Returns the discover time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.discover_time + + def get_provision_time(self, guid): + """ Returns the provision time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.provision_time + + def get_ready_time(self, guid): + """ Returns the deployment time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.ready_time + + def get_release_time(self, guid): + """ Returns the release time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.release_time + + def get_failed_time(self, guid): + """ Returns the time failure occured for the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.failed_time + def set_with_conditions(self, name, value, guids1, guids2, state, time = None): """ Modifies the value of attribute with name 'name' on all RMs @@ -713,8 +954,8 @@ class ExperimentController(object): if not guids: # If no guids list was passed, all 'NEW' RMs will be deployed guids = [] - for guid in self.resources: - if self.state(guid) == ResourceState.NEW: + for guid, rm in self._resources.iteritems(): + if rm.state == ResourceState.NEW: guids.append(guid) if isinstance(guids, int): @@ -754,6 +995,11 @@ class ExperimentController(object): rm = self.get_resource(guid) self.schedule("0s", rm.start_with_conditions) + if rm.conditions.get(ResourceAction.STOP): + # Only if the RM has STOP conditions we + # schedule a stop. Otherwise the RM will stop immediately + self.schedule("0s", rm.stop_with_conditions) + if wait_all_ready and new_group: # Schedule a function to check that all resources are # READY, and only then schedule the start. @@ -772,10 +1018,10 @@ class ExperimentController(object): if not wait_all_ready: self.schedule("0s", rm.start_with_conditions) - if rm.conditions.get(ResourceAction.STOP): - # Only if the RM has STOP conditions we - # schedule a stop. Otherwise the RM will stop immediately - self.schedule("0s", rm.stop_with_conditions) + if rm.conditions.get(ResourceAction.STOP): + # Only if the RM has STOP conditions we + # schedule a stop. Otherwise the RM will stop immediately + self.schedule("0s", rm.stop_with_conditions) def release(self, guids = None): """ Releases all ResourceManagers in the guids list. @@ -787,20 +1033,24 @@ class ExperimentController(object): :type guids: list """ + if isinstance(guids, int): + guids = [guids] + if not guids: guids = self.resources - # Remove all pending tasks from the scheduler queue - for tid in list(self._scheduler.pending): - self._scheduler.remove(tid) - - self._runner.empty() - for guid in guids: rm = self.get_resource(guid) self.schedule("0s", rm.release) self.wait_released(guids) + + if self.persist: + self.save() + + for guid in guids: + if self.get(guid, "hardRelease"): + self.remove_resource(guid) def shutdown(self): """ Releases all resources and stops the ExperimentController @@ -810,6 +1060,13 @@ class ExperimentController(object): if self._state == ECState.FAILED: raise RuntimeError("EC failure. Can not exit gracefully") + # Remove all pending tasks from the scheduler queue + for tid in list(self._scheduler.pending): + self._scheduler.remove(tid) + + # Remove pending tasks from the workers queue + self._runner.empty() + self.release() # Mark the EC state as TERMINATED @@ -896,6 +1153,8 @@ class ExperimentController(object): """ + self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads))) + self._runner = ParallelRun(maxthreads = self.nthreads) self._runner.start() while not self._stop: @@ -951,11 +1210,10 @@ class ExperimentController(object): :type task: Task """ - # Invoke callback - task.status = TaskStatus.DONE - try: + # Invoke callback task.result = task.callback() + task.status = TaskStatus.DONE except: import traceback err = traceback.format_exc() @@ -973,3 +1231,19 @@ class ExperimentController(object): self._cond.notify() self._cond.release() + def _build_from_netgraph(self, add_node_callback, add_edge_callback, + **kwargs): + """ Automates experiment description using a NetGraph instance. + """ + self._netgraph = NetGraph(**kwargs) + + if add_node_callback: + ### Add resources to the EC + for nid in self.netgraph.nodes(): + add_node_callback(self, nid) + + if add_edge_callback: + #### Add connections between resources + for nid1, nid2 in self.netgraph.edges(): + add_edge_callback(self, nid1, nid2) +