#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
-import functools
-import logging
-import os
-import random
-import sys
-import time
-import threading
-
from nepi.util import guid
from nepi.util.parallel import ParallelRun
-from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
+from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
from nepi.execution.resource import ResourceFactory, ResourceAction, \
ResourceState, ResourceState2str
from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
from nepi.execution.trace import TraceAttr
+from nepi.util.serializer import ECSerializer, SFormats
+from nepi.util.plotter import ECPlotter, PFormats
+from nepi.util.netgraph import NetGraph, TopologyType
# TODO: use multiprocessing instead of threading
-# TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
+# TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
+
+import functools
+import logging
+import os
+import sys
+import tempfile
+import time
+import threading
+import weakref
+
+class FailureLevel(object):
+ """ Describes the system failure state """
+ OK = 1
+ RM_FAILURE = 2
+ EC_FAILURE = 3
+
+class FailureManager(object):
+ """ The FailureManager is responsible for handling errors
+ and deciding whether an experiment should be aborted or not
+
+ """
+
+ def __init__(self, ec):
+ self._ec = weakref.ref(ec)
+ self._failure_level = FailureLevel.OK
+ self._abort = False
+
+ @property
+ def ec(self):
+ """ Returns the ExperimentController associated to this FailureManager
+
+ """
+
+ return self._ec()
+
+ @property
+ def abort(self):
+ return self._abort
+
+ def eval_failure(self, guid):
+ if self._failure_level == FailureLevel.OK:
+ rm = self.ec.get_resource(guid)
+ state = rm.state
+ critical = rm.get("critical")
+
+ if state == ResourceState.FAILED and critical:
+ self._failure_level = FailureLevel.RM_FAILURE
+ self._abort = True
+ self.ec.logger.debug("RM critical failure occurred on guid %d." \
+ " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
+
+ def set_ec_failure(self):
+ self._failure_level = FailureLevel.EC_FAILURE
class ECState(object):
- """ State of the Experiment Controller
+ """ Possible states for an ExperimentController
"""
RUNNING = 1
"""
.. class:: Class Args :
- :param exp_id: Id of the experiment
- :type exp_id: int
- :param root_dir: Root directory of the experiment
- :type root_dir: str
+ :param exp_id: Human readable identifier for the experiment scenario.
+ :type exp_id: str
.. note::
- This class is the only one used by the User. Indeed, the user "talks"
- only with the Experiment Controller and this latter forward to
- the different Resources Manager the order provided by the user.
-
+ An experiment, or scenario, is defined by a concrete set of resources,
+ and the behavior, configuration and interconnection of those resources.
+ The Experiment Description (ED) is a detailed representation of a
+ single experiment. It contains all the necessary information to
+ allow repeating the experiment. NEPI allows to describe
+ experiments by registering components (resources), configuring them
+ and interconnecting them.
+
+ A same experiment (scenario) can be executed many times, generating
+ different results. We call an experiment execution (instance) a 'run'.
+
+ The ExperimentController (EC), is the entity responsible of
+ managing an experiment run. The same scenario can be
+ recreated (and re-run) by instantiating an EC and recreating
+ the same experiment description.
+
+ An experiment is represented as a graph of interconnected
+ resources. A resource is a generic concept in the sense that any
+ component taking part of an experiment, whether physical of
+ virtual, is considered a resource. A resources could be a host,
+ a virtual machine, an application, a simulator, a IP address.
+
+ A ResourceManager (RM), is the entity responsible for managing a
+ single resource. ResourceManagers are specific to a resource
+ type (i.e. An RM to control a Linux application will not be
+ the same as the RM used to control a ns-3 simulation).
+ To support a new type of resource, a new RM must be implemented.
+ NEPI already provides a variety of RMs to control basic resources,
+ and new can be extended from the existing ones.
+
+ Through the EC interface the user can create ResourceManagers (RMs),
+ configure them and interconnect them, to describe an experiment.
+ Describing an experiment through the EC does not run the experiment.
+ Only when the 'deploy()' method is invoked on the EC, the EC will take
+ actions to transform the 'described' experiment into a 'running' experiment.
+
+ While the experiment is running, it is possible to continue to
+ create/configure/connect RMs, and to deploy them to involve new
+ resources in the experiment (this is known as 'interactive' deployment).
+
+ An experiments in NEPI is identified by a string id,
+ which is either given by the user, or automatically generated by NEPI.
+ The purpose of this identifier is to separate files and results that
+ belong to different experiment scenarios.
+ However, since a same 'experiment' can be run many times, the experiment
+ id is not enough to identify an experiment instance (run).
+ For this reason, the ExperimentController has two identifier, the
+ exp_id, which can be re-used in different ExperimentController,
+ and the run_id, which is unique to one ExperimentController instance, and
+ is automatically generated by NEPI.
+
"""
- def __init__(self, exp_id = None, root_dir = "/tmp"):
+ @classmethod
+ def load(cls, filepath, format = SFormats.XML):
+ serializer = ECSerializer()
+ ec = serializer.load(filepath)
+ return ec
+
+ def __init__(self, exp_id = None, local_dir = None, persist = False,
+ add_node_callback = None, add_edge_callback = None, **kwargs):
+ """ ExperimentController entity to model an execute a network
+ experiment.
+
+ :param exp_id: Human readable name to identify the experiment
+ :type exp_id: str
+
+ :param local_dir: Path to local directory where to store experiment
+ related files
+ :type local_dir: str
+
+ :param persist: Save an XML description of the experiment after
+ completion at local_dir
+ :type persist: bool
+
+ :param add_node_callback: Callback to invoke for node instantiation
+ when automatic topology creation mode is used
+ :type add_node_callback: function
+
+ :param add_edge_callback: Callback to invoke for edge instantiation
+ when automatic topology creation mode is used
+ :type add_edge_callback: function
+
+ """
super(ExperimentController, self).__init__()
- # root directory to store files
- self._root_dir = root_dir
- # experiment identifier given by the user
- self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
+ # Logging
+ self._logger = logging.getLogger("ExperimentController")
+
+ # Run identifier. It identifies a concrete execution instance (run)
+ # of an experiment.
+ # Since a same experiment (same configuration) can be executed many
+ # times, this run_id permits to separate result files generated on
+ # different experiment executions
+ self._run_id = tsformat()
+
+ # Experiment identifier. Usually assigned by the user
+ # Identifies the experiment scenario (i.e. configuration,
+ # resources used, etc)
+ self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
+
+ # Local path where to store experiment related files (results, etc)
+ if not local_dir:
+ local_dir = tempfile.gettempdir() # /tmp
+
+ self._local_dir = local_dir
+ self._exp_dir = os.path.join(local_dir, self.exp_id)
+ self._run_dir = os.path.join(self.exp_dir, self.run_id)
+
+ # If True persist the experiment controller in XML format, after completion
+ self._persist = persist
# generator of globally unique ids
self._guid_generator = guid.GuidGenerator()
# Resource managers
self._resources = dict()
- # Scheduler
+ # Scheduler. It a queue that holds tasks scheduled for
+ # execution, and yields the next task to be executed
+ # ordered by execution and arrival time
self._scheduler = HeapScheduler()
# Tasks
self._tasks = dict()
- # Event processing thread
- self._cond = threading.Condition()
- self._thread = threading.Thread(target = self._process)
- self._thread.setDaemon(True)
- self._thread.start()
+ # RM groups (for deployment)
+ self._groups = dict()
+
+ # generator of globally unique id for groups
+ self._group_id_generator = guid.GuidGenerator()
+
+ # Flag to stop processing thread
+ self._stop = False
+
+ # Entity in charge of managing system failures
+ self._fm = FailureManager(self)
# EC state
self._state = ECState.RUNNING
- # Logging
- self._logger = logging.getLogger("ExperimentController")
+ # Automatically construct experiment description
+ self._netgraph = None
+ if add_node_callback or add_edge_callback or kwargs.get("topology"):
+ self._build_from_netgraph(add_node_callback, add_edge_callback,
+ **kwargs)
+ # The runner is a pool of threads used to parallelize
+ # execution of tasks
+ self._nthreads = 20
+ self._runner = None
+
+ # Event processing thread
+ self._cond = threading.Condition()
+ self._thread = threading.Thread(target = self._process)
+ self._thread.setDaemon(True)
+ self._thread.start()
+
@property
def logger(self):
- """ Return the logger of the Experiment Controller
+ """ Returns the logger instance of the Experiment Controller
"""
return self._logger
+ @property
+ def failure_level(self):
+ """ Returns the level of FAILURE of th experiment
+
+ """
+
+ return self._fm._failure_level
+
@property
def ecstate(self):
- """ Return the state of the Experiment Controller
+ """ Returns the state of the Experiment Controller
"""
return self._state
@property
def exp_id(self):
- """ Return the experiment ID
+ """ Returns the experiment id assigned by the user
"""
- exp_id = self._exp_id
- if not exp_id.startswith("nepi-"):
- exp_id = "nepi-" + exp_id
- return exp_id
+ return self._exp_id
@property
- def finished(self):
- """ Put the state of the Experiment Controller into a final state :
- Either TERMINATED or FAILED
+ def run_id(self):
+ """ Returns the experiment instance (run) identifier (automatically
+ generated)
"""
- return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+ return self._run_id
+
+ @property
+ def nthreads(self):
+ """ Returns the number of processing nthreads used
+
+ """
+ return self._nthreads
+
+ @property
+ def local_dir(self):
+ """ Root local directory for experiment files
+
+ """
+ return self._local_dir
+
+ @property
+ def exp_dir(self):
+ """ Local directory to store results and other files related to the
+ experiment.
+
+ """
+ return self._exp_dir
+
+ @property
+ def run_dir(self):
+ """ Local directory to store results and other files related to the
+ experiment run.
+
+ """
+ return self._run_dir
+
+ @property
+ def persist(self):
+ """ If True, persists the ExperimentController to XML format upon
+ experiment completion
+
+ """
+ return self._persist
+
+ @property
+ def netgraph(self):
+ """ Return NetGraph instance if experiment description was automatically
+ generated
+
+ """
+ return self._netgraph
+
+ @property
+ def abort(self):
+ """ Returns True if the experiment has failed and should be interrupted,
+ False otherwise.
+
+ """
+ return self._fm.abort
+
+ def inform_failure(self, guid):
+ """ Reports a failure in a RM to the EC for evaluation
+
+ :param guid: Resource id
+ :type guid: int
+
+ """
+
+ return self._fm.eval_failure(guid)
def wait_finished(self, guids):
- """ Blocking method that wait until all the RM from the 'guid' list
- reach the state FINISHED
+ """ Blocking method that waits until all RMs in the 'guids' list
+ have reached a state >= STOPPED (i.e. STOPPED, FAILED or
+ RELEASED ), or until a failure in the experiment occurs
+ (i.e. abort == True)
+
+ :param guids: List of guids
+ :type guids: list
+
+ """
+
+ def quit():
+ return self.abort
+
+ return self.wait(guids, state = ResourceState.STOPPED,
+ quit = quit)
+
+ def wait_started(self, guids):
+ """ Blocking method that waits until all RMs in the 'guids' list
+ have reached a state >= STARTED, or until a failure in the
+ experiment occurs (i.e. abort == True)
+
+ :param guids: List of guids
+ :type guids: list
+
+ """
+
+ def quit():
+ return self.abort
+
+ return self.wait(guids, state = ResourceState.STARTED,
+ quit = quit)
+
+ def wait_released(self, guids):
+ """ Blocking method that waits until all RMs in the 'guids' list
+ have reached a state == RELEASED, or until the EC fails
+
+ :param guids: List of guids
+ :type guids: list
+
+ """
+
+ def quit():
+ return self._state == ECState.FAILED
+
+ return self.wait(guids, state = ResourceState.RELEASED,
+ quit = quit)
+
+ def wait_deployed(self, guids):
+ """ Blocking method that waits until all RMs in the 'guids' list
+ have reached a state >= READY, or until a failure in the
+ experiment occurs (i.e. abort == True)
+
+ :param guids: List of guids
+ :type guids: list
- :param guids: List of guids
- :type guids: list
+ """
+
+ def quit():
+ return self.abort
+
+ return self.wait(guids, state = ResourceState.READY,
+ quit = quit)
+
+ def wait(self, guids, state, quit):
+ """ Blocking method that waits until all RMs in the 'guids' list
+ have reached a state >= 'state', or until the 'quit' callback
+ yields True
+
+ :param guids: List of guids
+ :type guids: list
+
"""
if isinstance(guids, int):
guids = [guids]
- while not all([self.state(guid) in [ResourceState.FINISHED,
- ResourceState.STOPPED,
- ResourceState.FAILED] \
- for guid in guids]) and not self.finished:
- # We keep the sleep as large as possible to
- # decrese the number of RM state requests
- time.sleep(2)
-
+ # Make a copy to avoid modifying the original guids list
+ guids = list(guids)
+
+ while True:
+ # If there are no more guids to wait for
+ # or the quit function returns True, exit the loop
+ if len(guids) == 0 or quit():
+ break
+
+ # If a guid reached one of the target states, remove it from list
+ guid = guids.pop()
+ rm = self.get_resource(guid)
+ rstate = rm.state
+
+ if rstate >= state:
+ self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
+ rm.get_rtype(), guid, rstate, state))
+ else:
+ # Debug...
+ self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
+ guid, rstate, state))
+
+ guids.append(guid)
+
+ time.sleep(0.5)
+
+ def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
+ plotter = ECPlotter()
+ fpath = plotter.plot(self, dirpath = dirpath, format= format,
+ show = show)
+ return fpath
+
+ def serialize(self, format = SFormats.XML):
+ serializer = ECSerializer()
+ sec = serializer.load(self, format = format)
+ return sec
+
+ def save(self, dirpath = None, format = SFormats.XML):
+ if dirpath == None:
+ dirpath = self.run_dir
+
+ try:
+ os.makedirs(dirpath)
+ except OSError:
+ pass
+
+ serializer = ECSerializer()
+ path = serializer.save(self, dirpath, format = format)
+ return path
+
def get_task(self, tid):
- """ Get a specific task
+ """ Returns a task by its id
- :param tid: Id of the task
- :type tid: int
- :rtype: unknow
+ :param tid: Id of the task
+ :type tid: int
+
+ :rtype: Task
+
"""
return self._tasks.get(tid)
def get_resource(self, guid):
- """ Get a specific Resource Manager
+ """ Returns a registered ResourceManager by its guid
- :param guid: Id of the task
- :type guid: int
- :rtype: ResourceManager
+ :param guid: Id of the resource
+ :type guid: int
+
+ :rtype: ResourceManager
+
+ """
+ rm = self._resources.get(guid)
+ return rm
+
+ def get_resources_by_type(self, rtype):
+ """ Returns the ResourceManager objects of type rtype
+
+ :param rtype: Resource type
+ :type rtype: string
+
+ :rtype: list of ResourceManagers
+
"""
- return self._resources.get(guid)
+ rms = []
+ for guid, rm in self._resources.iteritems():
+ if rm.get_rtype() == rtype:
+ rms.append(rm)
+ return rms
+
+ def remove_resource(self, guid):
+ del self._resources[guid]
@property
def resources(self):
- """ Returns the list of all the Resource Manager Id
+ """ Returns the guids of all ResourceManagers
+
+ :return: Set of all RM guids
+ :rtype: list
+
+ """
+ keys = self._resources.keys()
+
+ return keys
- :rtype: set
+ def filter_resources(self, rtype):
+ """ Returns the guids of all ResourceManagers of type rtype
+
+ :param rtype: Resource type
+ :type rtype: string
+
+ :rtype: list of guids
+
"""
- return self._resources.keys()
+ rms = []
+ for guid, rm in self._resources.iteritems():
+ if rm.get_rtype() == rtype:
+ rms.append(rm.guid)
+ return rms
def register_resource(self, rtype, guid = None):
- """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
- for the RM of type 'rtype' and add it to the list of Resources.
+ """ Registers a new ResourceManager of type 'rtype' in the experiment
+
+ This method will assign a new 'guid' for the RM, if no guid
+ is specified.
+
+ :param rtype: Type of the RM
+ :type rtype: str
- :param rtype: Type of the RM
- :type rtype: str
- :return : Id of the RM
- :rtype: int
+ :return: Guid of the RM
+ :rtype: int
+
"""
# Get next available guid
guid = self._guid_generator.next(guid)
return guid
def get_attributes(self, guid):
- """ Return all the attibutes of a specific RM
+ """ Returns all the attributes of the RM with guid 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :return: List of attributes
+ :rtype: list
- :param guid: Guid of the RM
- :type guid: int
- :return : List of attributes
- :rtype: list
"""
rm = self.get_resource(guid)
return rm.get_attributes()
+ def get_attribute(self, guid, name):
+ """ Returns the attribute 'name' of the RM with guid 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :param name: Name of the attribute
+ :type name: str
+
+ :return: The attribute with name 'name'
+ :rtype: Attribute
+
+ """
+ rm = self.get_resource(guid)
+ return rm.get_attribute(name)
+
def register_connection(self, guid1, guid2):
- """ Registers a guid1 with a guid2.
- The declaration order is not important
+ """ Registers a connection between a RM with guid 'guid1'
+ and another RM with guid 'guid2'.
+
+ The order of the in which the two guids are provided is not
+ important, since the connection relationship is symmetric.
:param guid1: First guid to connect
:type guid1: ResourceManager
rm1 = self.get_resource(guid1)
rm2 = self.get_resource(guid2)
- rm1.connect(guid2)
- rm2.connect(guid1)
+ rm1.register_connection(guid2)
+ rm2.register_connection(guid1)
- def register_condition(self, group1, action, group2, state,
+ def register_condition(self, guids1, action, guids2, state,
time = None):
- """ Registers an action START or STOP for all RM on group1 to occur
- time 'time' after all elements in group2 reached state 'state'.
+ """ Registers an action START, STOP or DEPLOY for all RM on list
+ guids1 to occur at time 'time' after all elements in list guids2
+ have reached state 'state'.
- :param group1: List of guids of RMs subjected to action
- :type group1: list
+ :param guids1: List of guids of RMs subjected to action
+ :type guids1: list
- :param action: Action to register (either START or STOP)
+ :param action: Action to perform (either START, STOP or DEPLOY)
:type action: ResourceAction
- :param group2: List of guids of RMs to we waited for
- :type group2: list
+ :param guids2: List of guids of RMs to we waited for
+ :type guids2: list
- :param state: State to wait for on RMs (STARTED, STOPPED, etc)
+ :param state: State to wait for on RMs of list guids2 (STARTED,
+ STOPPED, etc)
:type state: ResourceState
- :param time: Time to wait after group2 has reached status
+ :param time: Time to wait after guids2 has reached status
:type time: string
"""
- if isinstance(group1, int):
- group1 = [group1]
- if isinstance(group2, int):
- group2 = [group2]
+ if isinstance(guids1, int):
+ guids1 = [guids1]
+ if isinstance(guids2, int):
+ guids2 = [guids2]
- for guid1 in group1:
+ for guid1 in guids1:
rm = self.get_resource(guid1)
- rm.register_condition(action, group2, state, time)
+ rm.register_condition(action, guids2, state, time)
- def register_trace(self, guid, name):
- """ Enable trace
+ def enable_trace(self, guid, name):
+ """ Enables a trace to be collected during the experiment run
+
+ :param name: Name of the trace
+ :type name: str
- :param name: Name of the trace
- :type name: str
"""
rm = self.get_resource(guid)
- rm.register_trace(name)
+ rm.enable_trace(name)
+
+ def trace_enabled(self, guid, name):
+ """ Returns True if the trace of name 'name' is enabled
+
+ :param name: Name of the trace
+ :type name: str
+
+ """
+ rm = self.get_resource(guid)
+ return rm.trace_enabled(name)
def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
- """ Get information on collected trace
+ """ Returns information on a collected trace, the trace stream or
+ blocks (chunks) of the trace stream
- :param name: Name of the trace
- :type name: str
+ :param name: Name of the trace
+ :type name: str
- :param attr: Can be one of:
+ :param attr: Can be one of:
- TraceAttr.ALL (complete trace content),
- - TraceAttr.STREAM (block in bytes to read starting at offset),
+ - TraceAttr.STREAM (block in bytes to read starting
+ at offset),
- TraceAttr.PATH (full path to the trace file),
- TraceAttr.SIZE (size of trace file).
- :type attr: str
+ :type attr: str
+
+ :param block: Number of bytes to retrieve from trace, when attr is
+ TraceAttr.STREAM
+ :type name: int
- :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
- :type name: int
+ :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
+ :type name: int
- :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
- :type name: int
+ :rtype: str
- :rtype: str
"""
rm = self.get_resource(guid)
return rm.trace(name, attr, block, offset)
+ def get_traces(self, guid):
+ """ Returns the list of the trace names of the RM with guid 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :return: List of trace names
+ :rtype: list
+
+ """
+ rm = self.get_resource(guid)
+ return rm.get_traces()
+
+
def discover(self, guid):
- """ Discover a specific RM defined by its 'guid'
+ """ Discovers an available resource matching the criteria defined
+ by the RM with guid 'guid', and associates that resource to the RM
+
+ Not all RM types require (or are capable of) performing resource
+ discovery. For the RM types which are not capable of doing so,
+ invoking this method does not have any consequences.
:param guid: Guid of the RM
:type guid: int
return rm.discover()
def provision(self, guid):
- """ Provision a specific RM defined by its 'guid'
+ """ Provisions the resource associated to the RM with guid 'guid'.
+
+ Provisioning means making a resource 'accessible' to the user.
+ Not all RM types require (or are capable of) performing resource
+ provisioning. For the RM types which are not capable of doing so,
+ invoking this method does not have any consequences.
:param guid: Guid of the RM
:type guid: int
return rm.provision()
def get(self, guid, name):
- """ Get a specific attribute 'name' from the RM 'guid'
+ """ Returns the value of the attribute with name 'name' on the
+ RM with guid 'guid'
:param guid: Guid of the RM
:type guid: int
- :param name: attribute's name
+ :param name: Name of the attribute
:type name: str
+ :return: The value of the attribute with name 'name'
+
"""
rm = self.get_resource(guid)
return rm.get(name)
def set(self, guid, name, value):
- """ Set a specific attribute 'name' from the RM 'guid'
- with the value 'value'
+ """ Modifies the value of the attribute with name 'name' on the
+ RM with guid 'guid'.
:param guid: Guid of the RM
:type guid: int
- :param name: attribute's name
+ :param name: Name of the attribute
:type name: str
- :param value: attribute's value
+ :param value: Value of the attribute
"""
rm = self.get_resource(guid)
- return rm.set(name, value)
+ rm.set(name, value)
+
+ def get_global(self, rtype, name):
+ """ Returns the value of the global attribute with name 'name' on the
+ RMs of rtype 'rtype'.
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :param name: Name of the attribute
+ :type name: str
+
+ :return: The value of the attribute with name 'name'
+
+ """
+ rclass = ResourceFactory.get_resource_type(rtype)
+ return rclass.get_global(name)
+
+ def set_global(self, rtype, name, value):
+ """ Modifies the value of the global attribute with name 'name' on the
+ RMs of with rtype 'rtype'.
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :param name: Name of the attribute
+ :type name: str
+
+ :param value: Value of the attribute
+
+ """
+ rclass = ResourceFactory.get_resource_type(rtype)
+ return rclass.set_global(name, value)
def state(self, guid, hr = False):
""" Returns the state of a resource
"""
rm = self.get_resource(guid)
+ state = rm.state
+
if hr:
- return ResourceState2str.get(rm.state)
+ return ResourceState2str.get(state)
- return rm.state
+ return state
def stop(self, guid):
- """ Stop a specific RM defined by its 'guid'
+ """ Stops the RM with guid 'guid'
+
+ Stopping a RM means that the resource it controls will
+ no longer take part of the experiment.
:param guid: Guid of the RM
:type guid: int
return rm.stop()
def start(self, guid):
- """ Start a specific RM defined by its 'guid'
+ """ Starts the RM with guid 'guid'
+
+ Starting a RM means that the resource it controls will
+ begin taking part of the experiment.
:param guid: Guid of the RM
:type guid: int
rm = self.get_resource(guid)
return rm.start()
- def set_with_conditions(self, name, value, group1, group2, state,
+ def get_start_time(self, guid):
+ """ Returns the start time of the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.start_time
+
+ def get_stop_time(self, guid):
+ """ Returns the stop time of the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.stop_time
+
+ def get_discover_time(self, guid):
+ """ Returns the discover time of the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.discover_time
+
+ def get_provision_time(self, guid):
+ """ Returns the provision time of the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.provision_time
+
+ def get_ready_time(self, guid):
+ """ Returns the deployment time of the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.ready_time
+
+ def get_release_time(self, guid):
+ """ Returns the release time of the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.release_time
+
+ def get_failed_time(self, guid):
+ """ Returns the time failure occured for the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.failed_time
+
+ def set_with_conditions(self, name, value, guids1, guids2, state,
time = None):
- """ Set value 'value' on attribute with name 'name' on all RMs of
- group1 when 'time' has elapsed since all elements in group2
- have reached state 'state'.
+ """ Modifies the value of attribute with name 'name' on all RMs
+ on the guids1 list when time 'time' has elapsed since all
+ elements in guids2 list have reached state 'state'.
:param name: Name of attribute to set in RM
:type name: string
:param value: Value of attribute to set in RM
:type name: string
- :param group1: List of guids of RMs subjected to action
- :type group1: list
+ :param guids1: List of guids of RMs subjected to action
+ :type guids1: list
:param action: Action to register (either START or STOP)
:type action: ResourceAction
- :param group2: List of guids of RMs to we waited for
- :type group2: list
+ :param guids2: List of guids of RMs to we waited for
+ :type guids2: list
:param state: State to wait for on RMs (STARTED, STOPPED, etc)
:type state: ResourceState
- :param time: Time to wait after group2 has reached status
+ :param time: Time to wait after guids2 has reached status
:type time: string
"""
- if isinstance(group1, int):
- group1 = [group1]
- if isinstance(group2, int):
- group2 = [group2]
+ if isinstance(guids1, int):
+ guids1 = [guids1]
+ if isinstance(guids2, int):
+ guids2 = [guids2]
- for guid1 in group1:
+ for guid1 in guids1:
rm = self.get_resource(guid)
- rm.set_with_conditions(name, value, group2, state, time)
-
- def stop_with_conditions(self, guid):
- """ Stop a specific RM defined by its 'guid' only if all the conditions are true
+ rm.set_with_conditions(name, value, guids2, state, time)
- :param guid: Guid of the RM
- :type guid: int
-
- """
- rm = self.get_resource(guid)
- return rm.stop_with_conditions()
+ def deploy(self, guids = None, wait_all_ready = True, group = None):
+ """ Deploys all ResourceManagers in the guids list.
+
+ If the argument 'guids' is not given, all RMs with state NEW
+ are deployed.
- def start_with_conditions(self, guid):
- """ Start a specific RM defined by its 'guid' only if all the conditions are true
+ :param guids: List of guids of RMs to deploy
+ :type guids: list
- :param guid: Guid of the RM
+ :param wait_all_ready: Wait until all RMs are ready in
+ order to start the RMs
:type guid: int
- """
- rm = self.get_resource(guid)
- return rm.start_with_condition()
-
- def deploy(self, group = None, wait_all_ready = True):
- """ Deploy all resource manager in group
-
- :param group: List of guids of RMs to deploy
- :type group: list
-
- :param wait_all_ready: Wait until all RMs are ready in
- order to start the RMs
- :type guid: int
+ :param group: Id of deployment group in which to deploy RMs
+ :type group: int
"""
self.logger.debug(" ------- DEPLOY START ------ ")
+ if not guids:
+ # If no guids list was passed, all 'NEW' RMs will be deployed
+ guids = []
+ for guid, rm in self._resources.iteritems():
+ if rm.state == ResourceState.NEW:
+ guids.append(guid)
+
+ if isinstance(guids, int):
+ guids = [guids]
+
+ # Create deployment group
+ # New guids can be added to a same deployment group later on
+ new_group = False
if not group:
- group = self.resources
-
- # Before starting deployment we disorder the group list with the
- # purpose of speeding up the whole deployment process.
- # It is likely that the user inserted in the 'group' list closely
- # resources one after another (e.g. all applications
- # connected to the same node can likely appear one after another).
- # This can originate a slow down in the deployment since the N
- # threads the parallel runner uses to processes tasks may all
- # be taken up by the same family of resources waiting for the
- # same conditions (e.g. LinuxApplications running on a same
- # node share a single lock, so they will tend to be serialized).
- # If we disorder the group list, this problem can be mitigated.
- random.shuffle(group)
+ new_group = True
+ group = self._group_id_generator.next()
+
+ if group not in self._groups:
+ self._groups[group] = []
+
+ self._groups[group].extend(guids)
def wait_all_and_start(group):
+ # Function that checks if all resources are READY
+ # before scheduling a start_with_conditions for each RM
reschedule = False
- for guid in group:
- rm = self.get_resource(guid)
- if rm.state < ResourceState.READY:
+
+ # Get all guids in group
+ guids = self._groups[group]
+
+ for guid in guids:
+ if self.state(guid) < ResourceState.READY:
reschedule = True
break
callback = functools.partial(wait_all_and_start, group)
self.schedule("1s", callback)
else:
- # If all resources are read, we schedule the start
- for guid in group:
+ # If all resources are ready, we schedule the start
+ for guid in guids:
rm = self.get_resource(guid)
- self.schedule("0.01s", rm.start_with_conditions)
-
- if wait_all_ready:
- # Schedule the function that will check all resources are
- # READY, and only then it will schedule the start.
- # This is aimed to reduce the number of tasks looping in the scheduler.
- # Intead of having N start tasks, we will have only one
+ self.schedule("0s", rm.start_with_conditions)
+
+ if rm.conditions.get(ResourceAction.STOP):
+ # Only if the RM has STOP conditions we
+ # schedule a stop. Otherwise the RM will stop immediately
+ self.schedule("0s", rm.stop_with_conditions)
+
+ if wait_all_ready and new_group:
+ # Schedule a function to check that all resources are
+ # READY, and only then schedule the start.
+ # This aims at reducing the number of tasks looping in the
+ # scheduler.
+ # Instead of having many start tasks, we will have only one for
+ # the whole group.
callback = functools.partial(wait_all_and_start, group)
- self.schedule("1s", callback)
+ self.schedule("0s", callback)
- for guid in group:
+ for guid in guids:
rm = self.get_resource(guid)
- self.schedule("0.001s", rm.deploy)
+ rm.deployment_group = group
+ self.schedule("0s", rm.deploy_with_conditions)
if not wait_all_ready:
- self.schedule("1s", rm.start_with_conditions)
+ self.schedule("0s", rm.start_with_conditions)
- if rm.conditions.get(ResourceAction.STOP):
- # Only if the RM has STOP conditions we
- # schedule a stop. Otherwise the RM will stop immediately
- self.schedule("2s", rm.stop_with_conditions)
+ if rm.conditions.get(ResourceAction.STOP):
+ # Only if the RM has STOP conditions we
+ # schedule a stop. Otherwise the RM will stop immediately
+ self.schedule("0s", rm.stop_with_conditions)
+ def release(self, guids = None):
+ """ Releases all ResourceManagers in the guids list.
- def release(self, group = None):
- """ Release the elements of the list 'group' or
- all the resources if any group is specified
+ If the argument 'guids' is not given, all RMs registered
+ in the experiment are released.
- :param group: List of RM
- :type group: list
+ :param guids: List of RM guids
+ :type guids: list
"""
- if not group:
- group = self.resources
+ if isinstance(guids, int):
+ guids = [guids]
- threads = []
- for guid in group:
+ if not guids:
+ guids = self.resources
+
+ for guid in guids:
rm = self.get_resource(guid)
- thread = threading.Thread(target=rm.release)
- threads.append(thread)
- thread.setDaemon(True)
- thread.start()
-
- while list(threads) and not self.finished:
- thread = threads[0]
- # Time out after 5 seconds to check EC not terminated
- thread.join(5)
- if not thread.is_alive():
- threads.remove(thread)
+ self.schedule("0s", rm.release)
+
+ self.wait_released(guids)
+
+ if self.persist:
+ self.save()
+
+ for guid in guids:
+ if self.get(guid, "hardRelease"):
+ self.remove_resource(guid)
def shutdown(self):
- """ Shutdown the Experiment Controller.
- It means : Release all the resources and stop the scheduler
+ """ Releases all resources and stops the ExperimentController
"""
+ # If there was a major failure we can't exit gracefully
+ if self._state == ECState.FAILED:
+ raise RuntimeError("EC failure. Can not exit gracefully")
+
+ # Remove all pending tasks from the scheduler queue
+ for tid in list(self._scheduler.pending):
+ self._scheduler.remove(tid)
+
+ # Remove pending tasks from the workers queue
+ self._runner.empty()
+
self.release()
- self._stop_scheduler()
+ # Mark the EC state as TERMINATED
+ self._state = ECState.TERMINATED
+
+ # Stop processing thread
+ self._stop = True
+
+ # Notify condition to wake up the processing thread
+ self._notify()
if self._thread.is_alive():
self._thread.join()
def schedule(self, date, callback, track = False):
- """ Schedule a callback to be executed at time date.
+ """ Schedules a callback to be executed at time 'date'.
:param date: string containing execution time for the task.
It can be expressed as an absolute time, using
Python function, and receives args and kwargs
as arguments.
- :param track: if set to True, the task will be retrivable with
+ :param track: if set to True, the task will be retrievable with
the get_task() method
:return : The Id of the task
+ :rtype: int
+
"""
- timestamp = strfvalid(date)
-
+ timestamp = stabsformat(date)
task = Task(timestamp, callback)
task = self._scheduler.schedule(task)
if track:
self._tasks[task.id] = task
-
+
# Notify condition to wake up the processing thread
- self._cond.acquire()
- self._cond.notify()
- self._cond.release()
+ self._notify()
return task.id
def _process(self):
- """ Process at executing the task that are in the scheduler.
+ """ Process scheduled tasks.
+
+ .. note::
+
+ Tasks are scheduled by invoking the schedule method with a target
+ callback and an execution time.
+ The schedule method creates a new Task object with that callback
+ and execution time, and pushes it into the '_scheduler' queue.
+ The execution time and the order of arrival of tasks are used
+ to order the tasks in the queue.
+
+ The _process method is executed in an independent thread held by
+ the ExperimentController for as long as the experiment is running.
+ This method takes tasks from the '_scheduler' queue in a loop
+ and processes them in parallel using multithreading.
+ The environmental variable NEPI_NTHREADS can be used to control
+ the number of threads used to process tasks. The default value is
+ 50.
+
+ To execute tasks in parallel, a ParallelRunner (PR) object is used.
+ This object keeps a pool of threads (workers), and a queue of tasks
+ scheduled for 'immediate' execution.
+
+ On each iteration, the '_process' loop will take the next task that
+ is scheduled for 'future' execution from the '_scheduler' queue,
+ and if the execution time of that task is >= to the current time,
+ it will push that task into the PR for 'immediate execution'.
+ As soon as a worker is free, the PR will assign the next task to
+ that worker.
+
+ Upon receiving a task to execute, each PR worker (thread) will
+ invoke the _execute method of the EC, passing the task as
+ argument.
+ The _execute method will then invoke task.callback inside a
+ try/except block. If an exception is raised by the tasks.callback,
+ it will be trapped by the try block, logged to standard error
+ (usually the console), and the task will be marked as failed.
"""
- runner = ParallelRun(maxthreads = 50)
- runner.start()
+ self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
+ self._runner = ParallelRun(maxthreads = self.nthreads)
+ self._runner.start()
- try:
- while not self.finished:
+ while not self._stop:
+ try:
self._cond.acquire()
+
task = self._scheduler.next()
- self._cond.release()
if not task:
- # It there are not tasks in the tasks queue we need to
- # wait until a call to schedule wakes us up
- self._cond.acquire()
+ # No task to execute. Wait for a new task to be scheduled.
self._cond.wait()
- self._cond.release()
- else:
- # If the task timestamp is in the future the thread needs to wait
- # until time elapse or until another task is scheduled
- now = strfnow()
+ else:
+ # The task timestamp is in the future. Wait for timeout
+ # or until another task is scheduled.
+ now = tnow()
if now < task.timestamp:
- # Calculate time difference in seconds
- timeout = strfdiff(task.timestamp, now)
+ # Calculate timeout in seconds
+ timeout = tdiffsec(task.timestamp, now)
+
# Re-schedule task with the same timestamp
self._scheduler.schedule(task)
- # Sleep until timeout or until a new task awakes the condition
- self._cond.acquire()
+
+ task = None
+
+ # Wait timeout or until a new task awakes the condition
self._cond.wait(timeout)
- self._cond.release()
- else:
- # Process tasks in parallel
- runner.put(self._execute, task)
- except:
- import traceback
- err = traceback.format_exc()
- self._logger.error("Error while processing tasks in the EC: %s" % err)
+
+ self._cond.release()
- self._state = ECState.FAILED
-
- # Mark EC state as terminated
- if self.ecstate == ECState.RUNNING:
- # Synchronize to get errors if occurred
- runner.sync()
- self._state = ECState.TERMINATED
+ if task:
+ # Process tasks in parallel
+ self._runner.put(self._execute, task)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.logger.error("Error while processing tasks in the EC: %s" % err)
+
+ # Set the EC to FAILED state
+ self._state = ECState.FAILED
+
+ # Set the FailureManager failure level to EC failure
+ self._fm.set_ec_failure()
+
+ self.logger.debug("Exiting the task processing loop ... ")
+
+ self._runner.sync()
+ self._runner.destroy()
def _execute(self, task):
- """ Invoke the callback of the task 'task'
+ """ Executes a single task.
- :param task: Id of the task
- :type task: int
+ :param task: Object containing the callback to execute
+ :type task: Task
"""
- # Invoke callback
- task.status = TaskStatus.DONE
-
try:
+ # Invoke callback
task.result = task.callback()
+ task.status = TaskStatus.DONE
except:
import traceback
err = traceback.format_exc()
task.result = err
task.status = TaskStatus.ERROR
- self._logger.error("Error occurred while executing task: %s" % err)
-
- self._stop_scheduler()
-
- # Propage error to the ParallelRunner
- raise
-
- def _stop_scheduler(self):
- """ Stop the scheduler and put the EC into a FAILED State.
+ self.logger.error("Error occurred while executing task: %s" % err)
+ def _notify(self):
+ """ Awakes the processing thread if it is blocked waiting
+ for new tasks to arrive
+
"""
-
- # Mark the EC as failed
- self._state = ECState.FAILED
-
- # Wake up the EC in case it was sleeping
self._cond.acquire()
self._cond.notify()
self._cond.release()
+ def _build_from_netgraph(self, add_node_callback, add_edge_callback,
+ **kwargs):
+ """ Automates experiment description using a NetGraph instance.
+ """
+ self._netgraph = NetGraph(**kwargs)
+
+ if add_node_callback:
+ ### Add resources to the EC
+ for nid in self.netgraph.nodes():
+ add_node_callback(self, nid)
+
+ if add_edge_callback:
+ #### Add connections between resources
+ for nid1, nid2 in self.netgraph.edges():
+ add_edge_callback(self, nid1, nid2)