import time
import threading
+class FailurePolicy(object):
+ """ Defines how to respond to experiment failures
+ """
+ IGNORE_RM_FAILURE = 1
+ ABORT_ON_RM_FAILURE = 2
+
+class FailureLevel(object):
+ """ Describe the system failure state
+ """
+ OK = 1
+ RM_FAILURE = 2
+ TASK_FAILURE = 3
+ EC_FAILURE = 4
+
+class FailureManager(object):
+ """ The FailureManager is responsible for handling errors,
+ and deciding whether an experiment should be aborted
+ """
+
+ def __init__(self, failure_policy = None):
+ self._failure_level = FailureLevel.OK
+ self._failure_policy = failure_policy or \
+ FailurePolicy.ABORT_ON_RM_FAILURE
+
+ @property
+ def abort(self):
+ if self._failure_level == FailureLevel.EC_FAILURE:
+ return True
+
+ if self._failure_level in [FailureLevel.TASK_FAILURE,
+ FailureLevel.RM_FAILURE] and \
+ self._failure_policy == FailurePolicy.ABORT_ON_RM_FAILURE:
+ return True
+
+ return False
+
+ def set_rm_failure(self):
+ self._failure_level = FailureLevel.RM_FAILURE
+
+ def set_task_failure(self):
+ self._failure_level = FailureLevel.TASK_FAILURE
+
+ def set_ec_failure(self):
+ self._failure_level = FailureLevel.EC_FAILURE
+
class ECState(object):
""" State of the Experiment Controller
.. class:: Class Args :
:param exp_id: Human readable identifier for the experiment scenario.
- It will be used in the name of the directory
- where experiment related information is stored
:type exp_id: str
.. note::
- An experiment, or scenario, is defined by a concrete use, behavior,
- configuration and interconnection of resources that describe a single
- experiment case (We call this the experiment description).
- A same experiment (scenario) can be run many times.
+ An experiment, or scenario, is defined by a concrete set of resources,
+ behavior, configuration and interconnection of those resources.
+ The Experiment Description (ED) is a detailed representation of a
+ single experiment. It contains all the necessary information to
+ allow repeating the experiment. NEPI allows to describe
+ experiments by registering components (resources), configuring them
+ and interconnecting them.
+
+ A same experiment (scenario) can be executed many times, generating
+ different results. We call an experiment execution (instance) a 'run'.
- The ExperimentController (EC), is the entity responsible for
- managing an experiment instance (run). The same scenario can be
+ The ExperimentController (EC), is the entity responsible of
+ managing an experiment run. The same scenario can be
recreated (and re-run) by instantiating an EC and recreating
the same experiment description.
single resource. ResourceManagers are specific to a resource
type (i.e. An RM to control a Linux application will not be
the same as the RM used to control a ns-3 simulation).
- In order for a new type of resource to be supported in NEPI
- a new RM must be implemented. NEPI already provides different
+ To support a new type of resource in NEPI, a new RM must be
+ implemented. NEPI already provides a variety of
RMs to control basic resources, and new can be extended from
the existing ones.
Through the EC interface the user can create ResourceManagers (RMs),
- configure them and interconnect them, in order to describe an experiment.
+ configure them and interconnect them, to describe an experiment.
Describing an experiment through the EC does not run the experiment.
- Only when the 'deploy()' method is invoked on the EC, will the EC take
+ Only when the 'deploy()' method is invoked on the EC, the EC will take
actions to transform the 'described' experiment into a 'running' experiment.
While the experiment is running, it is possible to continue to
However, since a same 'experiment' can be run many times, the experiment
id is not enough to identify an experiment instance (run).
For this reason, the ExperimentController has two identifier, the
- exp_id, which can be re-used by different ExperimentController instances,
- and the run_id, which unique to a ExperimentController instance, and
+ exp_id, which can be re-used in different ExperimentController,
+ and the run_id, which is unique to one ExperimentController instance, and
is automatically generated by NEPI.
"""
# Logging
self._logger = logging.getLogger("ExperimentController")
- # Run identifier. It identifies a concrete instance (run) of an experiment.
- # Since a same experiment (same configuration) can be run many times,
- # this id permits to identify concrete exoeriment run
+ # Run identifier. It identifies a concrete execution instance (run)
+ # of an experiment.
+ # Since a same experiment (same configuration) can be executed many
+ # times, this run_id permits to separate result files generated on
+ # different experiment executions
self._run_id = tsformat()
# Experiment identifier. Usually assigned by the user
+ # Identifies the experiment scenario (i.e. configuration,
+ # resources used, etc)
self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
# generator of globally unique ids
# Tasks
self._tasks = dict()
- # RM groups
+ # RM groups (for deployment)
self._groups = dict()
# generator of globally unique id for groups
self._thread.setDaemon(True)
self._thread.start()
+ # Flag to stop processing thread
+ self._stop = False
+
+ # Entity in charge of managing system failures
+ self._fm = FailureManager()
+
# EC state
self._state = ECState.RUNNING
return self._run_id
@property
- def finished(self):
- """ Put the state of the Experiment Controller into a final state :
- Either TERMINATED or FAILED
+ def abort(self):
+ return self._fm.abort
- """
- return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+ def set_rm_failure(self):
+ self._fm.set_rm_failure()
def wait_finished(self, guids):
- """ Blocking method that wait until all the RM from the 'guid' list
- reached the state FINISHED ( or STOPPED, FAILED or RELEASED )
+ """ Blocking method that wait until all RMs in the 'guid' list
+ reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or
+ RELEASED ) or until a System Failure occurs (e.g. Task Failure)
:param guids: List of guids
:type guids: list
+
"""
- return self.wait(guids)
+
+ def quit():
+ return self.abort
+
+ return self.wait(guids, state = ResourceState.STOPPED,
+ quit = quit)
def wait_started(self, guids):
- """ Blocking method that wait until all the RM from the 'guid' list
- reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED)
+ """ Blocking method that wait until all RMs in the 'guid' list
+ reach a state >= STARTED or until a System Failure occurs
+ (e.g. Task Failure)
:param guids: List of guids
:type guids: list
"""
- return self.wait(guids, state = ResourceState.STARTED)
+
+ def quit():
+ return self.abort
+
+ return self.wait(guids, state = ResourceState.STARTED,
+ quit = quit)
def wait_released(self, guids):
- """ Blocking method that wait until all the RM from the 'guid' list
- reached the state RELEASED (or FAILED)
+ """ Blocking method that wait until all RMs in the 'guid' list
+ reach a state = RELEASED or until the EC fails
:param guids: List of guids
:type guids: list
"""
- # TODO: solve state concurrency BUG and !!!!
- # correct waited release state to state = ResourceState.FAILED)
- return self.wait(guids, state = ResourceState.FINISHED)
+
+ def quit():
+ return self._state == ECState.FAILED
+
+ return self.wait(guids, state = ResourceState.RELEASED,
+ quit = quit)
def wait_deployed(self, guids):
- """ Blocking method that wait until all the RM from the 'guid' list
- reached the state READY (or any higher state)
+ """ Blocking method that wait until all RMs in the 'guid' list
+ reach a state >= READY or until a System Failure occurs
+ (e.g. Task Failure)
:param guids: List of guids
:type guids: list
"""
- return self.wait(guids, state = ResourceState.READY)
- def wait(self, guids, state = ResourceState.STOPPED):
- """ Blocking method that waits until all the RM from the 'guid' list
- reached state 'state' or until a failure occurs
-
+ def quit():
+ return self.abort
+
+ return self.wait(guids, state = ResourceState.READY,
+ quit = quit)
+
+ def wait(self, guids, state, quit):
+ """ Blocking method that wait until all RMs in the 'guid' list
+ reach a state >= 'state' or until quit yileds True
+
:param guids: List of guids
:type guids: list
"""
if isinstance(guids, int):
guids = [guids]
- # we randomly alter the order of the guids to avoid ordering
- # dependencies (e.g. LinuxApplication RMs runing on the same
- # linux host will be synchronized by the LinuxNode SSH lock)
- random.shuffle(guids)
-
while True:
- # If no more guids to wait for or an error occured, then exit
- if len(guids) == 0 or self.finished:
+ # If there are no more guids to wait for
+ # or the quit function returns True, exit the loop
+ if len(guids) == 0 or quit():
break
# If a guid reached one of the target states, remove it from list
guids.remove(guid)
else:
# Debug...
- self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid,
- self.state(guid, hr = True), state))
-
- # Take the opportunity to 'refresh' the states of the RMs.
- # Query only the first up to N guids (not to overwhelm
- # the local machine)
- n = 100
- lim = n if len(guids) > n else ( len(guids) -1 )
- nguids = guids[0: lim]
-
- # schedule state request for all guids (take advantage of
- # scheduler multi threading).
- for guid in nguids:
- callback = functools.partial(self.state, guid)
- self.schedule("0s", callback)
-
- # If the guid is not in one of the target states, wait and
- # continue quering. We keep the sleep big to decrease the
- # number of RM state queries
- time.sleep(4)
+ hrstate = ResourceState2str.get(rstate)
+ self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
+ guid, rstate, state))
+
+ time.sleep(0.5)
def get_task(self, tid):
""" Get a specific task
self.logger.debug(" ------- DEPLOY START ------ ")
if not guids:
- # If no guids list was indicated, all 'NEW' RMs will be deployed
+ # If no guids list was passed, all 'NEW' RMs will be deployed
guids = []
for guid in self.resources:
if self.state(guid) == ResourceState.NEW:
guids = [guids]
# Create deployment group
+ # New guids can be added to a same deployment group later on
new_group = False
if not group:
new_group = True
self._groups[group].extend(guids)
- # Before starting deployment we disorder the guids list with the
- # purpose of speeding up the whole deployment process.
- # It is likely that the user inserted in the 'guids' list closely
- # resources one after another (e.g. all applications
- # connected to the same node can likely appear one after another).
- # This can originate a slow down in the deployment since the N
- # threads the parallel runner uses to processes tasks may all
- # be taken up by the same family of resources waiting for the
- # same conditions (e.g. LinuxApplications running on a same
- # node share a single lock, so they will tend to be serialized).
- # If we disorder the guids list, this problem can be mitigated.
- random.shuffle(guids)
-
def wait_all_and_start(group):
+ # Function that checks if all resources are READY
+ # before scheduling a start_with_conditions for each RM
reschedule = False
# Get all guids in group
if wait_all_ready and new_group:
# Schedule a function to check that all resources are
# READY, and only then schedule the start.
- # This aimes at reducing the number of tasks looping in the
+ # This aims at reducing the number of tasks looping in the
# scheduler.
- # Intead of having N start tasks, we will have only one for
+ # Instead of having many start tasks, we will have only one for
# the whole group.
callback = functools.partial(wait_all_and_start, group)
self.schedule("1s", callback)
Releases all the resources and stops task processing thread
"""
+ # TODO: Clean the parallel runner!! STOP all ongoing tasks
+ ####
+
self.release()
# Mark the EC state as TERMINATED
self._state = ECState.TERMINATED
+ # Stop processing thread
+ self._stop = True
+
# Notify condition to wake up the processing thread
self._notify()
runner = ParallelRun(maxthreads = nthreads)
runner.start()
- try:
- while not self.finished:
+ while not self._stop:
+ try:
self._cond.acquire()
task = self._scheduler.next()
if task:
# Process tasks in parallel
runner.put(self._execute, task)
- except:
- import traceback
- err = traceback.format_exc()
- self.logger.error("Error while processing tasks in the EC: %s" % err)
+ except:
+ import traceback
+ err = traceback.format_exc()
+ self.logger.error("Error while processing tasks in the EC: %s" % err)
- self._state = ECState.FAILED
- finally:
- self.logger.debug("Exiting the task processing loop ... ")
- runner.sync()
- runner.destroy()
+ # Set the EC to FAILED state
+ self._state = ECState.FAILED
+
+ # Set the FailureManager failure level
+ self._fm.set_ec_failure()
+
+ self.logger.debug("Exiting the task processing loop ... ")
+ runner.sync()
+ runner.destroy()
def _execute(self, task):
""" Executes a single task.
self.logger.error("Error occurred while executing task: %s" % err)
- # Set the EC to FAILED state (this will force to exit the task
- # processing thread)
- self._state = ECState.FAILED
-
- # Notify condition to wake up the processing thread
- self._notify()
-
- # Propage error to the ParallelRunner
- raise
+ # Set the FailureManager failure level
+ self._fm.set_task_failure()
def _notify(self):
""" Awakes the processing thread in case it is blocked waiting