X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fec.py;h=f282ac6fa0956633308cf71e491f8b0cc5c80621;hb=a3073450d0ef8f8becb9de04b64bf5d4fdbc2272;hp=5d0363e3e19210542ee72512273a030be15e9faf;hpb=97f98e73d96e1e3106ba62f7a29f2ff7353e4c95;p=nepi.git diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 5d0363e3..f282ac6f 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -3,9 +3,8 @@ # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -17,181 +16,597 @@ # # Author: Alina Quereilhac -import functools -import logging -import os -import random -import sys -import time -import threading +from six import next -from nepi.util import guid from nepi.util.parallel import ParallelRun -from nepi.util.timefuncs import strfnow, strfdiff, strfvalid +from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat from nepi.execution.resource import ResourceFactory, ResourceAction, \ ResourceState, ResourceState2str from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus from nepi.execution.trace import TraceAttr +from nepi.util.serializer import ECSerializer, SFormats +from nepi.util.plotter import ECPlotter, PFormats +from nepi.util.netgraph import NetGraph, TopologyType # TODO: use multiprocessing instead of threading -# TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!! +# TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode) + +import functools +import logging +import os +import sys +import tempfile +import time +import threading +import weakref + +class FailureLevel(object): + """ Possible failure states for the experiment """ + OK = 1 + RM_FAILURE = 2 + EC_FAILURE = 3 + +class FailureManager(object): + """ The FailureManager is responsible for handling errors + and deciding whether an experiment should be aborted or not + """ + + def __init__(self): + self._ec = None + self._failure_level = FailureLevel.OK + self._abort = False + + def set_ec(self, ec): + self._ec = weakref.ref(ec) + + @property + def ec(self): + """ Returns the ExperimentController associated to this FailureManager + """ + return self._ec() + + @property + def abort(self): + return self._abort + + def eval_failure(self, guid): + """ Implements failure policy and sets the abort state of the + experiment based on the failure state and criticality of + the RM + + :param guid: Guid of the RM upon which the failure of the experiment + is evaluated + :type guid: int + + """ + if self._failure_level == FailureLevel.OK: + rm = self.ec.get_resource(guid) + state = rm.state + critical = rm.get("critical") + + if state == ResourceState.FAILED and critical: + self._failure_level = FailureLevel.RM_FAILURE + self._abort = True + self.ec.logger.debug("RM critical failure occurred on guid %d." \ + " Setting EC FAILURE LEVEL to RM_FAILURE" % guid) + + def set_ec_failure(self): + self._failure_level = FailureLevel.EC_FAILURE class ECState(object): - """ State of the Experiment Controller + """ Possible states of the ExperimentController """ RUNNING = 1 FAILED = 2 - TERMINATED = 3 + RELEASED = 3 + TERMINATED = 4 + +# historical note: this class used to be in util/guid.py but is used only here +# FIXME: This class is not thread-safe. Should it be made thread-safe? +class GuidGenerator(object): + def __init__(self): + self._last_guid = 0 + + # historical note: this used to be called `next` + # which confused 2to3 - and me - while it has + # nothing to do at all with the iteration protocol + def generate(self, guid = None): + if guid == None: + guid = self._last_guid + 1 + + self._last_guid = self._last_guid if guid <= self._last_guid else guid + + return guid class ExperimentController(object): """ - .. class:: Class Args : - - :param exp_id: Human readable identifier for the experiment. - It will be used in the name of the directory - where experiment related information is stored - :type exp_id: int + .. note:: - :param root_dir: Root directory where experiment specific folder - will be created to store experiment information - :type root_dir: str + An experiment, or scenario, is defined by a concrete set of resources, + and the behavior, configuration and interconnection of those resources. + The Experiment Description (ED) is a detailed representation of a + single experiment. It contains all the necessary information to + allow repeating the experiment. NEPI allows to describe + experiments by registering components (resources), configuring them + and interconnecting them. + + A same experiment (scenario) can be executed many times, generating + different results. We call an experiment execution (instance) a 'run'. + + The ExperimentController (EC), is the entity responsible of + managing an experiment run. The same scenario can be + recreated (and re-run) by instantiating an EC and recreating + the same experiment description. + + An experiment is represented as a graph of interconnected + resources. A resource is a generic concept in the sense that any + component taking part of an experiment, whether physical of + virtual, is considered a resource. A resources could be a host, + a virtual machine, an application, a simulator, a IP address. + + A ResourceManager (RM), is the entity responsible for managing a + single resource. ResourceManagers are specific to a resource + type (i.e. An RM to control a Linux application will not be + the same as the RM used to control a ns-3 simulation). + To support a new type of resource, a new RM must be implemented. + NEPI already provides a variety of RMs to control basic resources, + and new can be extended from the existing ones. + + Through the EC interface the user can create ResourceManagers (RMs), + configure them and interconnect them, to describe an experiment. + Describing an experiment through the EC does not run the experiment. + Only when the 'deploy()' method is invoked on the EC, the EC will take + actions to transform the 'described' experiment into a 'running' experiment. + + While the experiment is running, it is possible to continue to + create/configure/connect RMs, and to deploy them to involve new + resources in the experiment (this is known as 'interactive' deployment). + + An experiments in NEPI is identified by a string id, + which is either given by the user, or automatically generated by NEPI. + The purpose of this identifier is to separate files and results that + belong to different experiment scenarios. + However, since a same 'experiment' can be run many times, the experiment + id is not enough to identify an experiment instance (run). + For this reason, the ExperimentController has two identifier, the + exp_id, which can be re-used in different ExperimentController, + and the run_id, which is unique to one ExperimentController instance, and + is automatically generated by NEPI. + + """ - .. note:: - The ExperimentController (EC), is the entity responsible for - managing a single experiment. - Through the EC interface the user can create ResourceManagers (RMs), - configure them and interconnect them, in order to describe the experiment. + @classmethod + def load(cls, filepath, format = SFormats.XML): + serializer = ECSerializer() + ec = serializer.load(filepath) + return ec + + def __init__(self, exp_id = None, local_dir = None, persist = False, + fm = None, add_node_callback = None, add_edge_callback = None, + **kwargs): + """ ExperimentController entity to model an execute a network + experiment. - Only when the 'deploy()' method is invoked, the EC will take actions - to transform the 'described' experiment into a 'running' experiment. + :param exp_id: Human readable name to identify the experiment + :type exp_id: str - While the experiment is running, it is possible to continue to - create/configure/connect RMs, and to deploy them to involve new - resources in the experiment. + :param local_dir: Path to local directory where to store experiment + related files + :type local_dir: str - """ + :param persist: Save an XML description of the experiment after + completion at local_dir + :type persist: bool + + :param fm: FailureManager object. If None is given, the default + FailureManager class will be used + :type fm: FailureManager + + :param add_node_callback: Callback to invoke for node instantiation + when automatic topology creation mode is used + :type add_node_callback: function + + :param add_edge_callback: Callback to invoke for edge instantiation + when automatic topology creation mode is used + :type add_edge_callback: function - def __init__(self, exp_id = None, root_dir = "/tmp"): + """ super(ExperimentController, self).__init__() - # root directory to store files - self._root_dir = root_dir - # experiment identifier given by the user - self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex') + # Logging + self._logger = logging.getLogger("ExperimentController") + + # Run identifier. It identifies a concrete execution instance (run) + # of an experiment. + # Since a same experiment (same configuration) can be executed many + # times, this run_id permits to separate result files generated on + # different experiment executions + self._run_id = tsformat() + + # Experiment identifier. Usually assigned by the user + # Identifies the experiment scenario (i.e. configuration, + # resources used, etc) + self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex') + + # Local path where to store experiment related files (results, etc) + if not local_dir: + local_dir = tempfile.gettempdir() # /tmp + + self._local_dir = local_dir + self._exp_dir = os.path.join(local_dir, self.exp_id) + self._run_dir = os.path.join(self.exp_dir, self.run_id) + + # If True persist the experiment controller in XML format, after completion + self._persist = persist # generator of globally unique ids - self._guid_generator = guid.GuidGenerator() + self._guid_generator = GuidGenerator() # Resource managers self._resources = dict() - # Scheduler + # Scheduler. It a queue that holds tasks scheduled for + # execution, and yields the next task to be executed + # ordered by execution and arrival time self._scheduler = HeapScheduler() # Tasks self._tasks = dict() + # RM groups (for deployment) + self._groups = dict() + + # generator of globally unique id for groups + self._group_id_generator = GuidGenerator() + + # Flag to stop processing thread + self._stop = False + + # Entity in charge of managing system failures + if not fm: + self._fm = FailureManager() + self._fm.set_ec(self) + + # EC state + self._state = ECState.RUNNING + + # Automatically construct experiment description + self._netgraph = None + if add_node_callback or add_edge_callback or kwargs.get("topology"): + self._build_from_netgraph(add_node_callback, add_edge_callback, + **kwargs) + + # The runner is a pool of threads used to parallelize + # execution of tasks + self._nthreads = 20 + self._runner = None + # Event processing thread self._cond = threading.Condition() self._thread = threading.Thread(target = self._process) self._thread.setDaemon(True) self._thread.start() + + @property + def logger(self): + """ Returns the logger instance of the Experiment Controller - # EC state - self._state = ECState.RUNNING + """ + return self._logger - # Logging - self._logger = logging.getLogger("ExperimentController") + @property + def fm(self): + """ Returns the failure manager + + """ + + return self._fm @property - def logger(self): - """ Return the logger of the Experiment Controller + def failure_level(self): + """ Returns the level of FAILURE of th experiment """ - return self._logger + + return self._fm._failure_level @property def ecstate(self): - """ Return the state of the Experiment Controller + """ Returns the state of the Experiment Controller """ return self._state @property def exp_id(self): - """ Return the experiment ID + """ Returns the experiment id assigned by the user """ - exp_id = self._exp_id - if not exp_id.startswith("nepi-"): - exp_id = "nepi-" + exp_id - return exp_id + return self._exp_id @property - def finished(self): - """ Put the state of the Experiment Controller into a final state : - Either TERMINATED or FAILED + def run_id(self): + """ Returns the experiment instance (run) identifier (automatically + generated) """ - return self.ecstate in [ECState.FAILED, ECState.TERMINATED] + return self._run_id + + @property + def nthreads(self): + """ Returns the number of processing nthreads used + + """ + return self._nthreads + + @property + def local_dir(self): + """ Root local directory for experiment files + + """ + return self._local_dir + + @property + def exp_dir(self): + """ Local directory to store results and other files related to the + experiment. + + """ + return self._exp_dir + + @property + def run_dir(self): + """ Local directory to store results and other files related to the + experiment run. + + """ + return self._run_dir + + @property + def persist(self): + """ If True, persists the ExperimentController to XML format upon + experiment completion + + """ + return self._persist + + @property + def netgraph(self): + """ Return NetGraph instance if experiment description was automatically + generated + + """ + return self._netgraph + + @property + def abort(self): + """ Returns True if the experiment has failed and should be interrupted, + False otherwise. + + """ + return self._fm.abort + + def inform_failure(self, guid): + """ Reports a failure in a RM to the EC for evaluation + + :param guid: Resource id + :type guid: int + + """ + + return self._fm.eval_failure(guid) def wait_finished(self, guids): - """ Blocking method that wait until all the RM from the 'guid' list - reach the state FINISHED + """ Blocking method that waits until all RMs in the 'guids' list + have reached a state >= STOPPED (i.e. STOPPED, FAILED or + RELEASED ), or until a failure in the experiment occurs + (i.e. abort == True) + + :param guids: List of guids + :type guids: list + + """ + + def quit(): + return self.abort + + return self.wait(guids, state = ResourceState.STOPPED, + quit = quit) + + def wait_started(self, guids): + """ Blocking method that waits until all RMs in the 'guids' list + have reached a state >= STARTED, or until a failure in the + experiment occurs (i.e. abort == True) + + :param guids: List of guids + :type guids: list + + """ + + def quit(): + return self.abort + + return self.wait(guids, state = ResourceState.STARTED, + quit = quit) + + def wait_released(self, guids): + """ Blocking method that waits until all RMs in the 'guids' list + have reached a state == RELEASED, or until the EC fails + + :param guids: List of guids + :type guids: list + + """ + + def quit(): + return self._state == ECState.FAILED - :param guids: List of guids - :type guids: list + return self.wait(guids, state = ResourceState.RELEASED, + quit = quit) + + def wait_deployed(self, guids): + """ Blocking method that waits until all RMs in the 'guids' list + have reached a state >= READY, or until a failure in the + experiment occurs (i.e. abort == True) + + :param guids: List of guids + :type guids: list + + """ + + def quit(): + return self.abort + + return self.wait(guids, state = ResourceState.READY, + quit = quit) + + def wait(self, guids, state, quit): + """ Blocking method that waits until all RMs in the 'guids' list + have reached a state >= 'state', or until the 'quit' callback + yields True + + :param guids: List of guids + :type guids: list + """ if isinstance(guids, int): guids = [guids] - while not all([self.state(guid) in [ResourceState.FINISHED, - ResourceState.STOPPED, - ResourceState.FAILED] \ - for guid in guids]) and not self.finished: - # We keep the sleep as large as possible to - # decrese the number of RM state requests - time.sleep(2) - + # Make a copy to avoid modifying the original guids list + guids = list(guids) + + while True: + # If there are no more guids to wait for + # or the quit function returns True, exit the loop + if len(guids) == 0 or quit(): + break + + # If a guid reached one of the target states, remove it from list + guid = guids.pop() + rm = self.get_resource(guid) + rstate = rm.state + + if rstate >= state: + self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % ( + rm.get_rtype(), guid, rstate, state)) + else: + # Debug... + self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % ( + guid, rstate, state)) + + guids.append(guid) + + time.sleep(0.5) + + def plot(self, dirpath = None, format= PFormats.FIGURE, show = False): + plotter = ECPlotter() + fpath = plotter.plot(self, dirpath = dirpath, format= format, + show = show) + return fpath + + def serialize(self, format = SFormats.XML): + serializer = ECSerializer() + sec = serializer.load(self, format = format) + return sec + + def save(self, dirpath = None, format = SFormats.XML): + if dirpath == None: + dirpath = self.run_dir + + try: + os.makedirs(dirpath) + except OSError: + pass + + serializer = ECSerializer() + path = serializer.save(self, dirpath, format = format) + return path + def get_task(self, tid): - """ Get a specific task + """ Returns a task by its id - :param tid: Id of the task - :type tid: int - :rtype: unknow + :param tid: Id of the task + :type tid: int + + :rtype: Task + """ return self._tasks.get(tid) def get_resource(self, guid): - """ Get a specific Resource Manager + """ Returns a registered ResourceManager by its guid - :param guid: Id of the task - :type guid: int - :rtype: ResourceManager + :param guid: Id of the resource + :type guid: int + + :rtype: ResourceManager + """ - return self._resources.get(guid) + rm = self._resources.get(guid) + return rm + + def get_resources_by_type(self, rtype): + """ Returns the ResourceManager objects of type rtype + + :param rtype: Resource type + :type rtype: string + + :rtype: list of ResourceManagers + + """ + rms = [] + for guid, rm in self._resources.items(): + if rm.get_rtype() == rtype: + rms.append(rm) + return rms + + def remove_resource(self, guid): + del self._resources[guid] @property def resources(self): - """ Returns the list of all the Resource Manager Id + """ Returns the guids of all ResourceManagers + + :return: Set of all RM guids + :rtype: list + + """ + keys = list(self._resources.keys()) + + return keys - :rtype: set + def filter_resources(self, rtype): + """ Returns the guids of all ResourceManagers of type rtype + :param rtype: Resource type + :type rtype: string + + :rtype: list of guids + """ - return self._resources.keys() + rms = [] + for guid, rm in self._resources.items(): + if rm.get_rtype() == rtype: + rms.append(rm.guid) + return rms def register_resource(self, rtype, guid = None): - """ Register a Resource Manager. It creates a new 'guid', if it is not specified, - for the RM of type 'rtype' and add it to the list of Resources. + """ Registers a new ResourceManager of type 'rtype' in the experiment + + This method will assign a new 'guid' for the RM, if no guid + is specified. - :param rtype: Type of the RM - :type rtype: str - :return: Id of the RM - :rtype: int + :param rtype: Type of the RM + :type rtype: str + + :return: Guid of the RM + :rtype: int + """ # Get next available guid - guid = self._guid_generator.next(guid) + # xxx_next_hiccup + guid = self._guid_generator.generate(guid) # Instantiate RM rm = ResourceFactory.create(rtype, self, guid) @@ -202,97 +617,155 @@ class ExperimentController(object): return guid def get_attributes(self, guid): - """ Return all the attibutes of a specific RM + """ Returns all the attributes of the RM with guid 'guid' + + :param guid: Guid of the RM + :type guid: int + + :return: List of attributes + :rtype: list - :param guid: Guid of the RM - :type guid: int - :return: List of attributes - :rtype: list """ rm = self.get_resource(guid) return rm.get_attributes() + def get_attribute(self, guid, name): + """ Returns the attribute 'name' of the RM with guid 'guid' + + :param guid: Guid of the RM + :type guid: int + + :param name: Name of the attribute + :type name: str + + :return: The attribute with name 'name' + :rtype: Attribute + + """ + rm = self.get_resource(guid) + return rm.get_attribute(name) + def register_connection(self, guid1, guid2): - """ Registers a guid1 with a guid2. - The declaration order is not important + """ Registers a connection between a RM with guid 'guid1' + and another RM with guid 'guid2'. + + The order of the in which the two guids are provided is not + important, since the connection relationship is symmetric. :param guid1: First guid to connect :type guid1: ResourceManager :param guid2: Second guid to connect :type guid: ResourceManager + """ rm1 = self.get_resource(guid1) rm2 = self.get_resource(guid2) - rm1.connect(guid2) - rm2.connect(guid1) + rm1.register_connection(guid2) + rm2.register_connection(guid1) - def register_condition(self, group1, action, group2, state, + def register_condition(self, guids1, action, guids2, state, time = None): - """ Registers an action START or STOP for all RM on group1 to occur - time 'time' after all elements in group2 reached state 'state'. + """ Registers an action START, STOP or DEPLOY for all RM on list + guids1 to occur at time 'time' after all elements in list guids2 + have reached state 'state'. - :param group1: List of guids of RMs subjected to action - :type group1: list + :param guids1: List of guids of RMs subjected to action + :type guids1: list - :param action: Action to register (either START or STOP) + :param action: Action to perform (either START, STOP or DEPLOY) :type action: ResourceAction - :param group2: List of guids of RMs to we waited for - :type group2: list + :param guids2: List of guids of RMs to we waited for + :type guids2: list - :param state: State to wait for on RMs (STARTED, STOPPED, etc) + :param state: State to wait for on RMs of list guids2 (STARTED, + STOPPED, etc) :type state: ResourceState - :param time: Time to wait after group2 has reached status + :param time: Time to wait after guids2 has reached status :type time: string """ - if isinstance(group1, int): - group1 = [group1] - if isinstance(group2, int): - group2 = [group2] + if isinstance(guids1, int): + guids1 = [guids1] + if isinstance(guids2, int): + guids2 = [guids2] - for guid1 in group1: + for guid1 in guids1: rm = self.get_resource(guid1) - rm.register_condition(action, group2, state, time) + rm.register_condition(action, guids2, state, time) + + def enable_trace(self, guid, name): + """ Enables a trace to be collected during the experiment run - def register_trace(self, guid, name): - """ Enable trace + :param name: Name of the trace + :type name: str - :param name: Name of the trace - :type name: str """ rm = self.get_resource(guid) - rm.register_trace(name) + rm.enable_trace(name) + + def trace_enabled(self, guid, name): + """ Returns True if the trace of name 'name' is enabled + + :param name: Name of the trace + :type name: str + + """ + rm = self.get_resource(guid) + return rm.trace_enabled(name) def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0): - """ Get information on collected trace + """ Returns information on a collected trace, the trace stream or + blocks (chunks) of the trace stream - :param name: Name of the trace - :type name: str + :param name: Name of the trace + :type name: str - :param attr: Can be one of: + :param attr: Can be one of: - TraceAttr.ALL (complete trace content), - - TraceAttr.STREAM (block in bytes to read starting at offset), + - TraceAttr.STREAM (block in bytes to read starting + at offset), - TraceAttr.PATH (full path to the trace file), - TraceAttr.SIZE (size of trace file). - :type attr: str + :type attr: str - :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM - :type name: int + :param block: Number of bytes to retrieve from trace, when attr is + TraceAttr.STREAM + :type name: int - :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM - :type name: int + :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM + :type name: int + + :rtype: str - :rtype: str """ rm = self.get_resource(guid) return rm.trace(name, attr, block, offset) + def get_traces(self, guid): + """ Returns the list of the trace names of the RM with guid 'guid' + + :param guid: Guid of the RM + :type guid: int + + :return: List of trace names + :rtype: list + + """ + rm = self.get_resource(guid) + return rm.get_traces() + + def discover(self, guid): - """ Discover a specific RM defined by its 'guid' + """ Discovers an available resource matching the criteria defined + by the RM with guid 'guid', and associates that resource to the RM + + Not all RM types require (or are capable of) performing resource + discovery. For the RM types which are not capable of doing so, + invoking this method does not have any consequences. :param guid: Guid of the RM :type guid: int @@ -302,7 +775,12 @@ class ExperimentController(object): return rm.discover() def provision(self, guid): - """ Provision a specific RM defined by its 'guid' + """ Provisions the resource associated to the RM with guid 'guid'. + + Provisioning means making a resource 'accessible' to the user. + Not all RM types require (or are capable of) performing resource + provisioning. For the RM types which are not capable of doing so, + invoking this method does not have any consequences. :param guid: Guid of the RM :type guid: int @@ -312,33 +790,68 @@ class ExperimentController(object): return rm.provision() def get(self, guid, name): - """ Get a specific attribute 'name' from the RM 'guid' + """ Returns the value of the attribute with name 'name' on the + RM with guid 'guid' :param guid: Guid of the RM :type guid: int - :param name: attribute's name + :param name: Name of the attribute :type name: str + :return: The value of the attribute with name 'name' + """ rm = self.get_resource(guid) return rm.get(name) def set(self, guid, name, value): - """ Set a specific attribute 'name' from the RM 'guid' - with the value 'value' + """ Modifies the value of the attribute with name 'name' on the + RM with guid 'guid'. :param guid: Guid of the RM :type guid: int - :param name: attribute's name + :param name: Name of the attribute :type name: str - :param value: attribute's value + :param value: Value of the attribute """ rm = self.get_resource(guid) - return rm.set(name, value) + rm.set(name, value) + + def get_global(self, rtype, name): + """ Returns the value of the global attribute with name 'name' on the + RMs of rtype 'rtype'. + + :param guid: Guid of the RM + :type guid: int + + :param name: Name of the attribute + :type name: str + + :return: The value of the attribute with name 'name' + + """ + rclass = ResourceFactory.get_resource_type(rtype) + return rclass.get_global(name) + + def set_global(self, rtype, name, value): + """ Modifies the value of the global attribute with name 'name' on the + RMs of with rtype 'rtype'. + + :param guid: Guid of the RM + :type guid: int + + :param name: Name of the attribute + :type name: str + + :param value: Value of the attribute + + """ + rclass = ResourceFactory.get_resource_type(rtype) + return rclass.set_global(name, value) def state(self, guid, hr = False): """ Returns the state of a resource @@ -352,13 +865,18 @@ class ExperimentController(object): """ rm = self.get_resource(guid) + state = rm.state + if hr: - return ResourceState2str.get(rm.state) + return ResourceState2str.get(state) - return rm.state + return state def stop(self, guid): - """ Stop a specific RM defined by its 'guid' + """ Stops the RM with guid 'guid' + + Stopping a RM means that the resource it controls will + no longer take part of the experiment. :param guid: Guid of the RM :type guid: int @@ -368,7 +886,10 @@ class ExperimentController(object): return rm.stop() def start(self, guid): - """ Start a specific RM defined by its 'guid' + """ Starts the RM with guid 'guid' + + Starting a RM means that the resource it controls will + begin taking part of the experiment. :param guid: Guid of the RM :type guid: int @@ -377,11 +898,46 @@ class ExperimentController(object): rm = self.get_resource(guid) return rm.start() - def set_with_conditions(self, name, value, group1, group2, state, + def get_start_time(self, guid): + """ Returns the start time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.start_time + + def get_stop_time(self, guid): + """ Returns the stop time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.stop_time + + def get_discover_time(self, guid): + """ Returns the discover time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.discover_time + + def get_provision_time(self, guid): + """ Returns the provision time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.provision_time + + def get_ready_time(self, guid): + """ Returns the deployment time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.ready_time + + def get_release_time(self, guid): + """ Returns the release time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.release_time + + def get_failed_time(self, guid): + """ Returns the time failure occured for the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.failed_time + + def set_with_conditions(self, name, value, guids1, guids2, state, time = None): - """ Set value 'value' on attribute with name 'name' on all RMs of - group1 when 'time' has elapsed since all elements in group2 - have reached state 'state'. + """ Modifies the value of attribute with name 'name' on all RMs + on the guids1 list when time 'time' has elapsed since all + elements in guids2 list have reached state 'state'. :param name: Name of attribute to set in RM :type name: string @@ -389,88 +945,83 @@ class ExperimentController(object): :param value: Value of attribute to set in RM :type name: string - :param group1: List of guids of RMs subjected to action - :type group1: list + :param guids1: List of guids of RMs subjected to action + :type guids1: list :param action: Action to register (either START or STOP) :type action: ResourceAction - :param group2: List of guids of RMs to we waited for - :type group2: list + :param guids2: List of guids of RMs to we waited for + :type guids2: list :param state: State to wait for on RMs (STARTED, STOPPED, etc) :type state: ResourceState - :param time: Time to wait after group2 has reached status + :param time: Time to wait after guids2 has reached status :type time: string """ - if isinstance(group1, int): - group1 = [group1] - if isinstance(group2, int): - group2 = [group2] + if isinstance(guids1, int): + guids1 = [guids1] + if isinstance(guids2, int): + guids2 = [guids2] - for guid1 in group1: + for guid1 in guids1: rm = self.get_resource(guid) - rm.set_with_conditions(name, value, group2, state, time) - - def stop_with_conditions(self, guid): - """ Stop a specific RM defined by its 'guid' only if all the conditions are true - - :param guid: Guid of the RM - :type guid: int + rm.set_with_conditions(name, value, guids2, state, time) - """ - rm = self.get_resource(guid) - return rm.stop_with_conditions() + def deploy(self, guids = None, wait_all_ready = True, group = None): + """ Deploys all ResourceManagers in the guids list. + + If the argument 'guids' is not given, all RMs with state NEW + are deployed. - def start_with_conditions(self, guid): - """ Start a specific RM defined by its 'guid' only if all the conditions are true + :param guids: List of guids of RMs to deploy + :type guids: list - :param guid: Guid of the RM + :param wait_all_ready: Wait until all RMs are ready in + order to start the RMs :type guid: int - """ - rm = self.get_resource(guid) - return rm.start_with_condition() - - def deploy(self, group = None, wait_all_ready = True): - """ Deploy all resource manager in group - - :param group: List of guids of RMs to deploy - :type group: list - - :param wait_all_ready: Wait until all RMs are ready in - order to start the RMs - :type guid: int + :param group: Id of deployment group in which to deploy RMs + :type group: int """ self.logger.debug(" ------- DEPLOY START ------ ") + if not guids: + # If no guids list was passed, all 'NEW' RMs will be deployed + guids = [] + for guid, rm in self._resources.items(): + if rm.state == ResourceState.NEW: + guids.append(guid) + + if isinstance(guids, int): + guids = [guids] + + # Create deployment group + # New guids can be added to a same deployment group later on + new_group = False if not group: - group = self.resources - - if isinstance(group, int): - group = [group] - - # Before starting deployment we disorder the group list with the - # purpose of speeding up the whole deployment process. - # It is likely that the user inserted in the 'group' list closely - # resources one after another (e.g. all applications - # connected to the same node can likely appear one after another). - # This can originate a slow down in the deployment since the N - # threads the parallel runner uses to processes tasks may all - # be taken up by the same family of resources waiting for the - # same conditions (e.g. LinuxApplications running on a same - # node share a single lock, so they will tend to be serialized). - # If we disorder the group list, this problem can be mitigated. - #random.shuffle(group) + new_group = True + # xxx_next_hiccup + group = self._group_id_generator.generate() + + if group not in self._groups: + self._groups[group] = [] + + self._groups[group].extend(guids) def wait_all_and_start(group): + # Function that checks if all resources are READY + # before scheduling a start_with_conditions for each RM reschedule = False - for guid in group: - rm = self.get_resource(guid) - if rm.state < ResourceState.READY: + + # Get all guids in group + guids = self._groups[group] + + for guid in guids: + if self.state(guid) < ResourceState.READY: reschedule = True break @@ -478,68 +1029,97 @@ class ExperimentController(object): callback = functools.partial(wait_all_and_start, group) self.schedule("1s", callback) else: - # If all resources are read, we schedule the start - for guid in group: + # If all resources are ready, we schedule the start + for guid in guids: rm = self.get_resource(guid) self.schedule("0s", rm.start_with_conditions) - if wait_all_ready: - # Schedule the function that will check all resources are - # READY, and only then it will schedule the start. - # This is aimed to reduce the number of tasks looping in the scheduler. - # Intead of having N start tasks, we will have only one + if rm.conditions.get(ResourceAction.STOP): + # Only if the RM has STOP conditions we + # schedule a stop. Otherwise the RM will stop immediately + self.schedule("0s", rm.stop_with_conditions) + + if wait_all_ready and new_group: + # Schedule a function to check that all resources are + # READY, and only then schedule the start. + # This aims at reducing the number of tasks looping in the + # scheduler. + # Instead of having many start tasks, we will have only one for + # the whole group. callback = functools.partial(wait_all_and_start, group) - self.schedule("1s", callback) + self.schedule("0s", callback) - for guid in group: + for guid in guids: rm = self.get_resource(guid) - self.schedule("0s", rm.deploy) + rm.deployment_group = group + self.schedule("0s", rm.deploy_with_conditions) if not wait_all_ready: - self.schedule("1s", rm.start_with_conditions) + self.schedule("0s", rm.start_with_conditions) - if rm.conditions.get(ResourceAction.STOP): - # Only if the RM has STOP conditions we - # schedule a stop. Otherwise the RM will stop immediately - self.schedule("2s", rm.stop_with_conditions) + if rm.conditions.get(ResourceAction.STOP): + # Only if the RM has STOP conditions we + # schedule a stop. Otherwise the RM will stop immediately + self.schedule("0s", rm.stop_with_conditions) + def release(self, guids = None): + """ Releases all ResourceManagers in the guids list. - def release(self, group = None): - """ Release the elements of the list 'group' or - all the resources if any group is specified + If the argument 'guids' is not given, all RMs registered + in the experiment are released. - :param group: List of RM - :type group: list + :param guids: List of RM guids + :type guids: list """ - if not group: - group = self.resources + if self._state == ECState.RELEASED: + return + + if isinstance(guids, int): + guids = [guids] + + if not guids: + guids = self.resources - threads = [] - for guid in group: + for guid in guids: rm = self.get_resource(guid) - thread = threading.Thread(target=rm.release) - threads.append(thread) - thread.setDaemon(True) - thread.start() - - while list(threads) and not self.finished: - thread = threads[0] - # Time out after 5 seconds to check EC not terminated - thread.join(5) - if not thread.is_alive(): - threads.remove(thread) + self.schedule("0s", rm.release) + + self.wait_released(guids) + + if self.persist: + self.save() + + for guid in guids: + if self.get(guid, "hardRelease"): + self.remove_resource(guid)\ + + # Mark the EC state as RELEASED + self._state = ECState.RELEASED def shutdown(self): - """ Shutdown the Experiment Controller. - Releases all the resources and stops task processing thread + """ Releases all resources and stops the ExperimentController """ + # If there was a major failure we can't exit gracefully + if self._state == ECState.FAILED: + raise RuntimeError("EC failure. Can not exit gracefully") + + # Remove all pending tasks from the scheduler queue + for tid in list(self._scheduler.pending): + self._scheduler.remove(tid) + + # Remove pending tasks from the workers queue + self._runner.empty() + self.release() # Mark the EC state as TERMINATED self._state = ECState.TERMINATED + # Stop processing thread + self._stop = True + # Notify condition to wake up the processing thread self._notify() @@ -547,7 +1127,7 @@ class ExperimentController(object): self._thread.join() def schedule(self, date, callback, track = False): - """ Schedule a callback to be executed at time date. + """ Schedules a callback to be executed at time 'date'. :param date: string containing execution time for the task. It can be expressed as an absolute time, using @@ -558,19 +1138,20 @@ class ExperimentController(object): Python function, and receives args and kwargs as arguments. - :param track: if set to True, the task will be retrivable with + :param track: if set to True, the task will be retrievable with the get_task() method :return : The Id of the task + :rtype: int + """ - timestamp = strfvalid(date) - + timestamp = stabsformat(date) task = Task(timestamp, callback) task = self._scheduler.schedule(task) if track: self._tasks[task.id] = task - + # Notify condition to wake up the processing thread self._notify() @@ -579,119 +1160,135 @@ class ExperimentController(object): def _process(self): """ Process scheduled tasks. - The _process method is executed in an independent thread held by the - ExperimentController for as long as the experiment is running. + .. note:: - Tasks are scheduled by invoking the schedule method with a target callback. - The schedule method is given a execution time which controls the - order in which tasks are processed. - - Tasks are processed in parallel using multithreading. + Tasks are scheduled by invoking the schedule method with a target + callback and an execution time. + The schedule method creates a new Task object with that callback + and execution time, and pushes it into the '_scheduler' queue. + The execution time and the order of arrival of tasks are used + to order the tasks in the queue. + + The _process method is executed in an independent thread held by + the ExperimentController for as long as the experiment is running. + This method takes tasks from the '_scheduler' queue in a loop + and processes them in parallel using multithreading. The environmental variable NEPI_NTHREADS can be used to control - the number of threads used to process tasks. The default value is 50. - - Exception handling: - - To execute tasks in parallel, an ParallelRunner (PR) object, holding - a pool of threads (workers), is used. - For each available thread in the PR, the next task popped from - the scheduler queue is 'put' in the PR. - Upon receiving a task to execute, each PR worker (thread) invokes the - _execute method of the EC, passing the task as argument. - This method, calls task.callback inside a try/except block. If an - exception is raised by the tasks.callback, it will be trapped by the - try block, logged to standard error (usually the console), and the EC - state will be set to ECState.FAILED. - The invocation of _notify immediately after, forces the processing - loop in the _process method, to wake up if it was blocked waiting for new - tasks to arrived, and to check the EC state. - As the EC is in FAILED state, the processing loop exits and the - 'finally' block is invoked. In the 'finally' block, the 'sync' method - of the PR is invoked, which forces the PR to raise any unchecked errors - that might have been raised by the workers. - - """ - nthreads = int(os.environ.get("NEPI_NTHREADS", "50")) - - runner = ParallelRun(maxthreads = nthreads) - runner.start() + the number of threads used to process tasks. The default value is + 50. - try: - while not self.finished: + To execute tasks in parallel, a ParallelRunner (PR) object is used. + This object keeps a pool of threads (workers), and a queue of tasks + scheduled for 'immediate' execution. + + On each iteration, the '_process' loop will take the next task that + is scheduled for 'future' execution from the '_scheduler' queue, + and if the execution time of that task is >= to the current time, + it will push that task into the PR for 'immediate execution'. + As soon as a worker is free, the PR will assign the next task to + that worker. + + Upon receiving a task to execute, each PR worker (thread) will + invoke the _execute method of the EC, passing the task as + argument. + The _execute method will then invoke task.callback inside a + try/except block. If an exception is raised by the tasks.callback, + it will be trapped by the try block, logged to standard error + (usually the console), and the task will be marked as failed. + + """ + + self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads))) + self._runner = ParallelRun(maxthreads = self.nthreads) + self._runner.start() + + while not self._stop: + try: self._cond.acquire() - task = self._scheduler.next() - self._cond.release() + + task = next(self._scheduler) if not task: - # It there are not tasks in the tasks queue we need to - # wait until a call to schedule wakes us up - self._cond.acquire() + # No task to execute. Wait for a new task to be scheduled. self._cond.wait() - self._cond.release() - else: - # If the task timestamp is in the future the thread needs to wait - # until time elapse or until another task is scheduled - now = strfnow() + else: + # The task timestamp is in the future. Wait for timeout + # or until another task is scheduled. + now = tnow() if now < task.timestamp: - # Calculate time difference in seconds - timeout = strfdiff(task.timestamp, now) + # Calculate timeout in seconds + timeout = tdiffsec(task.timestamp, now) + # Re-schedule task with the same timestamp self._scheduler.schedule(task) - # Sleep until timeout or until a new task awakes the condition - self._cond.acquire() + + task = None + + # Wait timeout or until a new task awakes the condition self._cond.wait(timeout) - self._cond.release() - else: - # Process tasks in parallel - runner.put(self._execute, task) - except: - import traceback - err = traceback.format_exc() - self._logger.error("Error while processing tasks in the EC: %s" % err) + + self._cond.release() - self._state = ECState.FAILED - finally: - runner.sync() + if task: + # Process tasks in parallel + self._runner.put(self._execute, task) + except: + import traceback + err = traceback.format_exc() + self.logger.error("Error while processing tasks in the EC: %s" % err) + + # Set the EC to FAILED state + self._state = ECState.FAILED + + # Set the FailureManager failure level to EC failure + self._fm.set_ec_failure() + + self.logger.debug("Exiting the task processing loop ... ") + + self._runner.sync() + self._runner.destroy() def _execute(self, task): """ Executes a single task. - If the invokation of the task callback raises an - exception, the processing thread of the ExperimentController - will be stopped and the experiment will be aborted. - :param task: Object containing the callback to execute :type task: Task """ - # Invoke callback - task.status = TaskStatus.DONE - try: + # Invoke callback task.result = task.callback() + task.status = TaskStatus.DONE except: import traceback err = traceback.format_exc() task.result = err task.status = TaskStatus.ERROR - self._logger.error("Error occurred while executing task: %s" % err) - - # Set the EC to FAILED state (this will force to exit the task - # processing thread) - self._state = ECState.FAILED - - # Notify condition to wake up the processing thread - self._notify() - - # Propage error to the ParallelRunner - raise + self.logger.error("Error occurred while executing task: %s" % err) def _notify(self): - """ Awakes the processing thread in case it is blocked waiting - for a new task to be scheduled. + """ Awakes the processing thread if it is blocked waiting + for new tasks to arrive + """ self._cond.acquire() self._cond.notify() self._cond.release() + def _build_from_netgraph(self, add_node_callback, add_edge_callback, + **kwargs): + """ Automates experiment description using a NetGraph instance. + """ + self._netgraph = NetGraph(**kwargs) + + if add_node_callback: + ### Add resources to the EC + for nid in self.netgraph.nodes(): + add_node_callback(self, nid) + + if add_edge_callback: + #### Add connections between resources + for nid1, nid2 in self.netgraph.edges(): + add_edge_callback(self, nid1, nid2) +