the EC exposes another set of methods, which form the execution API.
These two APIs are described in detail in the rest of this chapter.
+
\section{The experiment script}
NEPI is a Python-based language and all classes and functions can
\end{lstlisting}
-Once this is done, an ExperimentController instance must be instantiated
-for a particular experiment. The ExperimentController constructor receives
+Once this is done, an ExperimentController must be instantiated for
+the experiment. The ExperimentController constructor receives
the optional argument \emph{exp\_id}. This argument is important because
it defines the experiment identity and allows to distinguish among different
experiments. If an experiment id is not explicitly given, NEPI will automatically
\end{lstlisting}
-%TODO: What is the run_id ??
+Since a same experiment can be ran more than one time, and this is
+often desirable to obtain statistical data, the EC identifies
+different runs of an experiment with a same \emph{exp\_id} with
+another attribute, the \emph{run\_id}. The \emph{run\_id} is
+a timestamp string value, and in combination with the \emph{exp\_id},
+it allows to uniquely identify an experiment instance.
+
+\begin{lstlisting}[language=Python]
+
+run_id = ec.run_id
+
+\end{lstlisting}
+
+
\section{The design API}
describing the experiment. The design API is the set of methods which
allow to do so.
+
\subsection{Registering resources}
Every resource supported by NEPI is controlled by a specific ResourceManager
The EC keeps internal references to all RMs, which the user can
reference using the corresponding guid value.
+
\subsection{Attributes}
ResourceManagers expose the configurable parameters of resources
\end{lstlisting}
-% Critical attribute
Since each RM type exposes the characteristics of a particular type
of resource, it is to be expected that different RMs will have different
attributes. However, there a particular attribute that is common to all RMs.
resource and carry on with the experiment. Otherwise, the EC will immediately
interrupt the experiment.
+
\subsection{Traces}
A Trace represent a stream of data collected during the experiment and associated
\end{lstlisting}
+
\subsection{Registering connections}
In order to describe the experiment set-up, resources need to be
RMs \emph{`know'} internally how to interpret the connection
relationship.
+
\subsection{Registering conditions}
All ResourceMangers must go through the same sequence of state transitions.
Existing states are: NEW, DISCOVERED, PROVISIONED, READY, STARTED, STOPPED,
FAILED and RELEASED. \\
+
+
\section{The execution API}
+After registering all the resources and connections and setting attributes and
+traces, once the experiment we want to conduct has been described, we can
+proceed to run it. To this purpose we make use of the \emph{execution} methods
+exposed by the EC.
+
+
\subsection{Deploying an experiment}
-%TODO: Talk about groups
-%TODO: Talk about interactive deploymet
+Deploying an experiment is very easy, it only requires to invoke the
+\emph{deploy} method of the EC.
+
+\begin{lstlisting}[language=Python]
+
+ec.deploy()
+
+\end{lstlisting}
+
+Given the experiment description provided earlier, the EC will take care
+of automatically performing all necessary actions to discover, provision,
+configure and start all resources registered in the experiment.
+
+Furthermore, NEPI does not restrict deployment to only one time, it allows
+to continue to register, connect and configure resources and deploy them
+at any moment. We call this feature \emph{interactive} or \emph{dynamic}
+deployment.
+
+The \emph{deploy} method can receive other optional arguments to customize
+deployment. By default, the EC will deploy all registered RMs that are in
+state NEW. However, it is possible to specify a subset of resources to be
+deployed using the \emph{guids} argument.
+
+\begin{lstlisting}[language=Python]
+
+ec.deploy(guids=[guid1, guid2, guid3])
+
+\end{lstlisting}
+
+Another useful argument of the \emph{deploy} method is \emph{wait\_all\_ready}.
+This argument has a default value of \emph{True}, and it is used as a barrier
+to force the START action to be invoked on all RMs being deploy only after
+they have all reached the state READY.
+
+\begin{lstlisting}[language=Python]
+
+ec.deploy(wait_all_ready=False)
+
+\end{lstlisting}
+
\subsection{Getting attributes}
+Attribute values can be retrieved at any moment during the experiment run,
+using the \emph{get} method.
+However, not all attributes can de modified after a resource has
+been deployed. The possibility of changing the value of a certain attribute
+depends strongly on the RM and on the attribute itself.
+As an example, once a \emph{hostname} has been specified for a certain Node
+RM, it might not be possible to change it after deployment.
+
+\begin{lstlisting}[language=Python]
+
+attr_value = ec.get(guid, "attr-name")
+
+\end{lstlisting}
+
+Attributes have flags that indicate whether their values can be changed
+and when it is possible to change them (e.g. before or after deployment,
+or both). These flags are \emph{NoFlags} (the attribute value can be
+modified always), \emph{ReadOnly} (the attribute value can never be
+modified), \emph{ExecReadOnly} (the attribute value can only be modified
+before deployment). The flags of a certain attribute can be validated
+as shown in the example below, and the value of the attribute can be
+changed using the \emph{set} method.
+
+\begin{lstlisting}[language=Python]
+
+from nepi.execution.attribute import Flags
+
+attr = ec.get_attribute(guid, "attr-name")
+
+if not attr.has_flag(Flags.ReadOnly):
+ ec.set(guid, "attr-name", attr_value)
+
+\end{lstlisting}
+
\subsection{Quering the state}
+It is possible to query the state of any resource at any moment.
+The state of a resource is requested using the \emph{state} method.
+This method receives the optional parameter \emph{hr} to output the
+state in a \emph{human readable} string format instead of an integer
+state code.
+
+\begin{lstlisting}[language=Python]
+
+state_id = ec.state(guid)
+
+# Human readable state
+state = ec.state(guid, hr = True)
+
+\end{lstlisting}
+
\subsection{Getting traces}
-
-% TODO: Give examples of Traces (how to collect traces to the local repo, talk about the Collector RM)
-% how to retrieve an application trace when the Node failed? (critical attribute)
+After a ResourceManager has been deployed it is possible to get information
+about the active traces and the trace streams of the generated data using
+the \emph{trace} method.
+
+Most traces are collected to a file in the host where they are generated,
+the total trace size and the file path in the (remote) host can be
+retrieved as follows.
+
+\begin{lstlisting}[language=Python]
+
+from nepi.execution.trace import TraceAttr
+
+path = ec.trace(guid, "trace-name", TraceAttr.PATH)
+size = ec.trace(guid, "trace-name", TraceAttr.SIZE)
+
+\end{lstlisting}
+
+The trace content can be retrieved in a stream, block by block.
+
+\begin{lstlisting}[language=Python]
+
+trace_block = ec.trace(guid, "trace-name", TraceAttr.STREAM, block=1, offset=0)
+
+\end{lstlisting}
+
+It is also possible to directly retrieve the complete trace content.
+
+\begin{lstlisting}[language=Python]
+
+trace_stream = ec.trace(guid, "trace-name")
+
+\end{lstlisting}
+
+Using the \emph{trace} method it is easy to collect all traces
+to the local user machine.
+
+\begin{lstlisting}[language=Python]
+
+for trace in ec.get_traces(guid):
+ trace_stream = ec.trace(guid, "trace-name")
+ f = open("trace-name", "w")
+ f.write(trace_stream)
+ f.close()
-\subsection{The collector RM}
+\end{lstlisting}
+
+
+% TODO: how to retrieve an application trace when the Node failed? (critical attribute)
+
+
+% \subsection{The collector RM}
+
+%%%%%%%%%%
+%% TODO
+%%%%%%%%%%%
+
+\subsection{API reference}
+
+Further information about classes and method signatures
+can be found using the Python \emph{help} method.
+For this inspection work, we recommend instantiating an
+ExperimentController from an IPython console. This is an
+interactive console that allows to dynamically send input
+to the python interpreter.
+
+If NEPI is not installed in the system, you will need to add the
+NEPI sources path to the PYTHONPATH environmental variable
+before invoking \emph{ipython}.
+
+\begin{lstlisting}[language=Python]
+
+$ PYTHONPATH=$PYTHONPATH:src ipython
+Python 2.7.3 (default, Jan 2 2013, 13:56:14)
+Type "copyright", "credits" or "license" for more information.
+
+
+IPython 0.13.1 -- An enhanced Interactive Python.
+? -> Introduction and overview of IPython's features.
+%quickref -> Quick reference.
+help -> Python's own help system.
+object? -> Details about 'object', use 'object??' for extra details.
+In [1]: from nepi.execution.ec import ExperimentController
+In [2]: ec = ExperimentController(exp_id = "test-tap")
+
+In [3]: help(ec.set)
+
+\end{lstlisting}
+
+The example above will show the following information related to the
+\emph{set} method of the EC API.
+
+\begin{lstlisting}[language=Python]
+
+Help on method set in module nepi.execution.ec:
+
+set(self, guid, name, value) method of nepi.execution.ec.ExperimentController instance
+ Modifies the value of the attribute with name 'name' on the RM with guid 'guid'.
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :param name: Name of the attribute
+ :type name: str
+
+ :param value: Value of the attribute
+
+\end{lstlisting}
import weakref
class FailureLevel(object):
- """ Describes the system failure state
- """
+ """ Describes the system failure state """
OK = 1
RM_FAILURE = 2
EC_FAILURE = 3
class FailureManager(object):
- """ The FailureManager is responsible for handling errors,
- and deciding whether an experiment should be aborted
+ """ The FailureManager is responsible for handling errors
+ and deciding whether an experiment should be aborted or not
+
"""
def __init__(self, ec):
@property
def ec(self):
- """ Returns the Experiment Controller """
+ """ Returns the ExperimentController associated to this FailureManager
+
+ """
+
return self._ec()
@property
class ECState(object):
- """ State of the Experiment Controller
+ """ Possible states for an ExperimentController
"""
RUNNING = 1
.. note::
- An experiment, or scenario, is defined by a concrete set of resources,
- behavior, configuration and interconnection of those resources.
- The Experiment Description (ED) is a detailed representation of a
- single experiment. It contains all the necessary information to
- allow repeating the experiment. NEPI allows to describe
- experiments by registering components (resources), configuring them
- and interconnecting them.
-
- A same experiment (scenario) can be executed many times, generating
- different results. We call an experiment execution (instance) a 'run'.
-
- The ExperimentController (EC), is the entity responsible of
- managing an experiment run. The same scenario can be
- recreated (and re-run) by instantiating an EC and recreating
- the same experiment description.
-
- In NEPI, an experiment is represented as a graph of interconnected
- resources. A resource is a generic concept in the sense that any
- component taking part of an experiment, whether physical of
- virtual, is considered a resource. A resources could be a host,
- a virtual machine, an application, a simulator, a IP address.
-
- A ResourceManager (RM), is the entity responsible for managing a
- single resource. ResourceManagers are specific to a resource
- type (i.e. An RM to control a Linux application will not be
- the same as the RM used to control a ns-3 simulation).
- To support a new type of resource in NEPI, a new RM must be
- implemented. NEPI already provides a variety of
- RMs to control basic resources, and new can be extended from
- the existing ones.
-
- Through the EC interface the user can create ResourceManagers (RMs),
- configure them and interconnect them, to describe an experiment.
- Describing an experiment through the EC does not run the experiment.
- Only when the 'deploy()' method is invoked on the EC, the EC will take
- actions to transform the 'described' experiment into a 'running' experiment.
-
- While the experiment is running, it is possible to continue to
- create/configure/connect RMs, and to deploy them to involve new
- resources in the experiment (this is known as 'interactive' deployment).
-
- An experiments in NEPI is identified by a string id,
- which is either given by the user, or automatically generated by NEPI.
- The purpose of this identifier is to separate files and results that
- belong to different experiment scenarios.
- However, since a same 'experiment' can be run many times, the experiment
- id is not enough to identify an experiment instance (run).
- For this reason, the ExperimentController has two identifier, the
- exp_id, which can be re-used in different ExperimentController,
- and the run_id, which is unique to one ExperimentController instance, and
- is automatically generated by NEPI.
+ An experiment, or scenario, is defined by a concrete set of resources,
+ behavior, configuration and interconnection of those resources.
+ The Experiment Description (ED) is a detailed representation of a
+ single experiment. It contains all the necessary information to
+ allow repeating the experiment. NEPI allows to describe
+ experiments by registering components (resources), configuring them
+ and interconnecting them.
+
+ A same experiment (scenario) can be executed many times, generating
+ different results. We call an experiment execution (instance) a 'run'.
+
+ The ExperimentController (EC), is the entity responsible of
+ managing an experiment run. The same scenario can be
+ recreated (and re-run) by instantiating an EC and recreating
+ the same experiment description.
+
+ In NEPI, an experiment is represented as a graph of interconnected
+ resources. A resource is a generic concept in the sense that any
+ component taking part of an experiment, whether physical of
+ virtual, is considered a resource. A resources could be a host,
+ a virtual machine, an application, a simulator, a IP address.
+
+ A ResourceManager (RM), is the entity responsible for managing a
+ single resource. ResourceManagers are specific to a resource
+ type (i.e. An RM to control a Linux application will not be
+ the same as the RM used to control a ns-3 simulation).
+ To support a new type of resource in NEPI, a new RM must be
+ implemented. NEPI already provides a variety of
+ RMs to control basic resources, and new can be extended from
+ the existing ones.
+
+ Through the EC interface the user can create ResourceManagers (RMs),
+ configure them and interconnect them, to describe an experiment.
+ Describing an experiment through the EC does not run the experiment.
+ Only when the 'deploy()' method is invoked on the EC, the EC will take
+ actions to transform the 'described' experiment into a 'running' experiment.
+
+ While the experiment is running, it is possible to continue to
+ create/configure/connect RMs, and to deploy them to involve new
+ resources in the experiment (this is known as 'interactive' deployment).
+
+ An experiments in NEPI is identified by a string id,
+ which is either given by the user, or automatically generated by NEPI.
+ The purpose of this identifier is to separate files and results that
+ belong to different experiment scenarios.
+ However, since a same 'experiment' can be run many times, the experiment
+ id is not enough to identify an experiment instance (run).
+ For this reason, the ExperimentController has two identifier, the
+ exp_id, which can be re-used in different ExperimentController,
+ and the run_id, which is unique to one ExperimentController instance, and
+ is automatically generated by NEPI.
"""
def __init__(self, exp_id = None):
super(ExperimentController, self).__init__()
+
# Logging
self._logger = logging.getLogger("ExperimentController")
@property
def logger(self):
- """ Return the logger of the Experiment Controller
+ """ Returns the logger instance of the Experiment Controller
"""
return self._logger
@property
def ecstate(self):
- """ Return the state of the Experiment Controller
+ """ Returns the state of the Experiment Controller
"""
return self._state
@property
def exp_id(self):
- """ Return the experiment id assigned by the user
+ """ Returns the experiment id assigned by the user
"""
return self._exp_id
@property
def run_id(self):
- """ Return the experiment instance (run) identifier
+ """ Returns the experiment instance (run) identifier (automatically
+ generated)
"""
return self._run_id
@property
def abort(self):
+ """ Returns True if the experiment has failed and should be interrupted,
+ False otherwise.
+
+ """
return self._fm.abort
def wait_finished(self, guids):
- """ Blocking method that wait until all RMs in the 'guid' list
- reach a state >= STOPPED (i.e. STOPPED, FAILED or
- RELEASED ) or until a System Failure occurs (e.g. Task Failure)
+ """ Blocking method that waits until all RMs in the 'guids' list
+ have reached a state >= STOPPED (i.e. STOPPED, FAILED or
+ RELEASED ), or until a failure in the experiment occurs
+ (i.e. abort == True)
- :param guids: List of guids
- :type guids: list
+ :param guids: List of guids
+ :type guids: list
"""
quit = quit)
def wait_started(self, guids):
- """ Blocking method that wait until all RMs in the 'guid' list
- reach a state >= STARTED or until a System Failure occurs
- (e.g. Task Failure)
+ """ Blocking method that waits until all RMs in the 'guids' list
+ have reached a state >= STARTED, or until a failure in the
+ experiment occurs (i.e. abort == True)
+
+ :param guids: List of guids
+ :type guids: list
- :param guids: List of guids
- :type guids: list
"""
def quit():
quit = quit)
def wait_released(self, guids):
- """ Blocking method that wait until all RMs in the 'guid' list
- reach a state = RELEASED or until the EC fails
+ """ Blocking method that waits until all RMs in the 'guids' list
+ have reached a state == RELEASED, or until the EC fails
+
+ :param guids: List of guids
+ :type guids: list
- :param guids: List of guids
- :type guids: list
"""
def quit():
quit = quit)
def wait_deployed(self, guids):
- """ Blocking method that wait until all RMs in the 'guid' list
- reach a state >= READY or until a System Failure occurs
- (e.g. Task Failure)
+ """ Blocking method that waits until all RMs in the 'guids' list
+ have reached a state >= READY, or until a failure in the
+ experiment occurs (i.e. abort == True)
+
+ :param guids: List of guids
+ :type guids: list
- :param guids: List of guids
- :type guids: list
"""
def quit():
quit = quit)
def wait(self, guids, state, quit):
- """ Blocking method that wait until all RMs in the 'guid' list
- reach a state >= 'state' or until quit yileds True
+ """ Blocking method that waits until all RMs in the 'guids' list
+ have reached a state >= 'state', or until the 'quit' callback
+ yields True
- :param guids: List of guids
- :type guids: list
+ :param guids: List of guids
+ :type guids: list
+
"""
+
if isinstance(guids, int):
guids = [guids]
time.sleep(0.5)
def get_task(self, tid):
- """ Get a specific task
+ """ Returns a task by its id
- :param tid: Id of the task
- :type tid: int
- :rtype: Task
+ :param tid: Id of the task
+ :type tid: int
+
+ :rtype: Task
+
"""
return self._tasks.get(tid)
def get_resource(self, guid):
- """ Get a specific Resource Manager
+ """ Returns a registered ResourceManager by its guid
- :param guid: Id of the task
- :type guid: int
- :rtype: ResourceManager
+ :param guid: Id of the task
+ :type guid: int
+
+ :rtype: ResourceManager
+
"""
return self._resources.get(guid)
@property
def resources(self):
- """ Returns the list of all the Resource Manager Id
+ """ Returns the set() of guids of all the ResourceManager
- :rtype: set
+ :return: Set of all RM guids
+ :rtype: set
"""
return self._resources.keys()
def register_resource(self, rtype, guid = None):
- """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
- for the RM of type 'rtype' and add it to the list of Resources.
+ """ Registers a new ResourceManager of type 'rtype' in the experiment
+
+ This method will assign a new 'guid' for the RM, if no guid
+ is specified.
- :param rtype: Type of the RM
- :type rtype: str
- :return: Id of the RM
- :rtype: int
+ :param rtype: Type of the RM
+ :type rtype: str
+
+ :return: Guid of the RM
+ :rtype: int
+
"""
# Get next available guid
guid = self._guid_generator.next(guid)
return guid
def get_attributes(self, guid):
- """ Return all the attibutes of a specific RM
+ """ Returns all the attributes of the RM with guid 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :return: List of attributes
+ :rtype: list
- :param guid: Guid of the RM
- :type guid: int
- :return: List of attributes
- :rtype: list
"""
rm = self.get_resource(guid)
return rm.get_attributes()
+ def get_attribute(self, guid, name):
+ """ Returns the attribute 'name' of the RM with guid 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :param name: Name of the attribute
+ :type name: str
+
+ :return: The attribute with name 'name'
+ :rtype: Attribute
+
+ """
+ rm = self.get_resource(guid)
+ return rm.get_attribute(name)
+
def register_connection(self, guid1, guid2):
- """ Registers a guid1 with a guid2.
- The declaration order is not important
+ """ Registers a connection between a RM with guid 'guid1'
+ and another RM with guid 'guid2'.
+
+ The order of the in which the two guids are provided is not
+ important, since the connection relationship is symmetric.
:param guid1: First guid to connect
:type guid1: ResourceManager
:param guid2: Second guid to connect
:type guid: ResourceManager
+
"""
rm1 = self.get_resource(guid1)
rm2 = self.get_resource(guid2)
def register_condition(self, guids1, action, guids2, state,
time = None):
- """ Registers an action START or STOP for all RM on guids1 to occur
- time 'time' after all elements in guids2 reached state 'state'.
+ """ Registers an action START, STOP or DEPLOY for all RM on list
+ guids1 to occur at time 'time' after all elements in list guids2
+ have reached state 'state'.
:param guids1: List of guids of RMs subjected to action
:type guids1: list
- :param action: Action to register (either START or STOP)
+ :param action: Action to perform (either START, STOP or DEPLOY)
:type action: ResourceAction
:param guids2: List of guids of RMs to we waited for
:type guids2: list
- :param state: State to wait for on RMs (STARTED, STOPPED, etc)
+ :param state: State to wait for on RMs of list guids2 (STARTED,
+ STOPPED, etc)
:type state: ResourceState
:param time: Time to wait after guids2 has reached status
rm.register_condition(action, guids2, state, time)
def enable_trace(self, guid, name):
- """ Enable trace
+ """ Enables a trace to be collected during the experiment run
+
+ :param name: Name of the trace
+ :type name: str
- :param name: Name of the trace
- :type name: str
"""
rm = self.get_resource(guid)
rm.enable_trace(name)
def trace_enabled(self, guid, name):
- """ Returns True if trace is enabled
+ """ Returns True if the trace of name 'name' is enabled
+
+ :param name: Name of the trace
+ :type name: str
- :param name: Name of the trace
- :type name: str
"""
rm = self.get_resource(guid)
return rm.trace_enabled(name)
def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
- """ Get information on collected trace
+ """ Returns information on a collected trace, the trace stream or
+ blocks (chunks) of the trace stream
- :param name: Name of the trace
- :type name: str
+ :param name: Name of the trace
+ :type name: str
- :param attr: Can be one of:
+ :param attr: Can be one of:
- TraceAttr.ALL (complete trace content),
- - TraceAttr.STREAM (block in bytes to read starting at offset),
+ - TraceAttr.STREAM (block in bytes to read starting
+ at offset),
- TraceAttr.PATH (full path to the trace file),
- TraceAttr.SIZE (size of trace file).
- :type attr: str
+ :type attr: str
- :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
- :type name: int
+ :param block: Number of bytes to retrieve from trace, when attr is
+ TraceAttr.STREAM
+ :type name: int
- :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
- :type name: int
+ :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
+ :type name: int
+
+ :rtype: str
- :rtype: str
"""
rm = self.get_resource(guid)
return rm.trace(name, attr, block, offset)
+ def get_traces(self, guid):
+ """ Returns the list of the trace names of the RM with guid 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :return: List of trace names
+ :rtype: list
+
+ """
+ rm = self.get_resource(guid)
+ return rm.get_traces()
+
+
def discover(self, guid):
- """ Discover a specific RM defined by its 'guid'
+ """ Discovers an available resource matching the criteria defined
+ by the RM with guid 'guid', and associates that resource to the RM
+
+ Not all RM types require (or are capable of) performing resource
+ discovery. For the RM types which are not capable of doing so,
+ invoking this method does not have any consequences.
:param guid: Guid of the RM
:type guid: int
return rm.discover()
def provision(self, guid):
- """ Provision a specific RM defined by its 'guid'
+ """ Provisions the resource associated to the RM with guid 'guid'.
+
+ Provisioning means making a resource 'accessible' to the user.
+ Not all RM types require (or are capable of) performing resource
+ provisioning. For the RM types which are not capable of doing so,
+ invoking this method does not have any consequences.
:param guid: Guid of the RM
:type guid: int
return rm.provision()
def get(self, guid, name):
- """ Get a specific attribute 'name' from the RM 'guid'
+ """ Returns the value of the attribute with name 'name' on the
+ RM with guid 'guid'
:param guid: Guid of the RM
:type guid: int
- :param name: attribute's name
+ :param name: Name of the attribute
:type name: str
+ :return: The value of the attribute with name 'name'
+
"""
rm = self.get_resource(guid)
return rm.get(name)
def set(self, guid, name, value):
- """ Set a specific attribute 'name' from the RM 'guid'
- with the value 'value'
+ """ Modifies the value of the attribute with name 'name' on the
+ RM with guid 'guid'.
:param guid: Guid of the RM
:type guid: int
- :param name: attribute's name
+ :param name: Name of the attribute
:type name: str
- :param value: attribute's value
+ :param value: Value of the attribute
"""
rm = self.get_resource(guid)
return state
def stop(self, guid):
- """ Stop a specific RM defined by its 'guid'
+ """ Stops the RM with guid 'guid'
+
+ Stopping a RM means that the resource it controls will
+ no longer take part of the experiment.
:param guid: Guid of the RM
:type guid: int
return rm.stop()
def start(self, guid):
- """ Start a specific RM defined by its 'guid'
+ """ Starts the RM with guid 'guid'
+
+ Starting a RM means that the resource it controls will
+ begin taking part of the experiment.
:param guid: Guid of the RM
:type guid: int
def set_with_conditions(self, name, value, guids1, guids2, state,
time = None):
- """ Set value 'value' on attribute with name 'name' on all RMs of
- guids1 when 'time' has elapsed since all elements in guids2
- have reached state 'state'.
+ """ Modifies the value of attribute with name 'name' on all RMs
+ on the guids1 list when time 'time' has elapsed since all
+ elements in guids2 list have reached state 'state'.
:param name: Name of attribute to set in RM
:type name: string
rm.set_with_conditions(name, value, guids2, state, time)
def deploy(self, guids = None, wait_all_ready = True, group = None):
- """ Deploy all resource manager in guids list
+ """ Deploys all ResourceManagers in the guids list.
+
+ If the argument 'guids' is not given, all RMs with state NEW
+ are deployed.
- :param guids: List of guids of RMs to deploy
- :type guids: list
+ :param guids: List of guids of RMs to deploy
+ :type guids: list
- :param wait_all_ready: Wait until all RMs are ready in
- order to start the RMs
- :type guid: int
+ :param wait_all_ready: Wait until all RMs are ready in
+ order to start the RMs
+ :type guid: int
- :param group: Id of deployment group in which to deploy RMs
- :type group: int
+ :param group: Id of deployment group in which to deploy RMs
+ :type group: int
"""
self.logger.debug(" ------- DEPLOY START ------ ")
self.schedule("0s", rm.stop_with_conditions)
def release(self, guids = None):
- """ Release al RMs on the guids list or
- all the resources if no list is specified
+ """ Releases all ResourceManagers in the guids list.
+
+ If the argument 'guids' is not given, all RMs registered
+ in the experiment are released.
:param guids: List of RM guids
:type guids: list
self.wait_released(guids)
def shutdown(self):
- """ Shutdown the Experiment Controller.
- Releases all the resources and stops task processing thread
+ """ Releases all resources and stops the ExperimentController
"""
# If there was a major failure we can't exit gracefully
self._thread.join()
def schedule(self, date, callback, track = False):
- """ Schedule a callback to be executed at time date.
+ """ Schedules a callback to be executed at time 'date'.
:param date: string containing execution time for the task.
It can be expressed as an absolute time, using
Python function, and receives args and kwargs
as arguments.
- :param track: if set to True, the task will be retrivable with
+ :param track: if set to True, the task will be retrievable with
the get_task() method
:return : The Id of the task
+ :rtype: int
+
"""
timestamp = stabsformat(date)
task = Task(timestamp, callback)
""" Process scheduled tasks.
.. note::
+
+ Tasks are scheduled by invoking the schedule method with a target
+ callback and an execution time.
+ The schedule method creates a new Task object with that callback
+ and execution time, and pushes it into the '_scheduler' queue.
+ The execution time and the order of arrival of tasks are used
+ to order the tasks in the queue.
+
+ The _process method is executed in an independent thread held by
+ the ExperimentController for as long as the experiment is running.
+ This method takes tasks from the '_scheduler' queue in a loop
+ and processes them in parallel using multithreading.
+ The environmental variable NEPI_NTHREADS can be used to control
+ the number of threads used to process tasks. The default value is
+ 50.
- The _process method is executed in an independent thread held by the
- ExperimentController for as long as the experiment is running.
+ To execute tasks in parallel, a ParallelRunner (PR) object is used.
+ This object keeps a pool of threads (workers), and a queue of tasks
+ scheduled for 'immediate' execution.
- Tasks are scheduled by invoking the schedule method with a target callback.
- The schedule method is given a execution time which controls the
- order in which tasks are processed.
+ On each iteration, the '_process' loop will take the next task that
+ is scheduled for 'future' execution from the '_scheduler' queue,
+ and if the execution time of that task is >= to the current time,
+ it will push that task into the PR for 'immediate execution'.
+ As soon as a worker is free, the PR will assign the next task to
+ that worker.
- Tasks are processed in parallel using multithreading.
- The environmental variable NEPI_NTHREADS can be used to control
- the number of threads used to process tasks. The default value is 50.
-
- Exception handling:
-
- To execute tasks in parallel, an ParallelRunner (PR) object, holding
- a pool of threads (workers), is used.
- For each available thread in the PR, the next task popped from
- the scheduler queue is 'put' in the PR.
- Upon receiving a task to execute, each PR worker (thread) invokes the
- _execute method of the EC, passing the task as argument.
- This method, calls task.callback inside a try/except block. If an
- exception is raised by the tasks.callback, it will be trapped by the
- try block, logged to standard error (usually the console), and the EC
- state will be set to ECState.FAILED.
- The invocation of _notify immediately after, forces the processing
- loop in the _process method, to wake up if it was blocked waiting for new
- tasks to arrived, and to check the EC state.
- As the EC is in FAILED state, the processing loop exits and the
- 'finally' block is invoked. In the 'finally' block, the 'sync' method
- of the PR is invoked, which forces the PR to raise any unchecked errors
- that might have been raised by the workers.
+ Upon receiving a task to execute, each PR worker (thread) will
+ invoke the _execute method of the EC, passing the task as
+ argument.
+ The _execute method will then invoke task.callback inside a
+ try/except block. If an exception is raised by the tasks.callback,
+ it will be trapped by the try block, logged to standard error
+ (usually the console), and the task will be marked as failed.
"""
:param task: Object containing the callback to execute
:type task: Task
- .. note::
-
- If the invokation of the task callback raises an
- exception, the processing thread of the ExperimentController
- will be stopped and the experiment will be aborted.
-
"""
# Invoke callback
task.status = TaskStatus.DONE
self.logger.error("Error occurred while executing task: %s" % err)
def _notify(self):
- """ Awakes the processing thread in case it is blocked waiting
- for a new task to be scheduled.
+ """ Awakes the processing thread if it is blocked waiting
+ for new tasks to arrive
+
"""
self._cond.acquire()
self._cond.notify()