from nepi.util import guid
from nepi.util.parallel import ParallelRun
-from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
+from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
from nepi.execution.resource import ResourceFactory, ResourceAction, \
ResourceState, ResourceState2str
from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
from nepi.execution.trace import TraceAttr
# TODO: use multiprocessing instead of threading
-# TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
+# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
+# TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
class ECState(object):
""" State of the Experiment Controller
"""
.. class:: Class Args :
- :param exp_id: Human readable identifier for the experiment.
- It will be used in the name of the directory
+ :param exp_id: Human readable identifier for the experiment scenario.
+ It will be used in the name of the directory
where experiment related information is stored
- :type exp_id: int
-
- :param root_dir: Root directory where experiment specific folder
- will be created to store experiment information
- :type root_dir: str
+ :type exp_id: str
.. note::
+
+ An experiment, or scenario, is defined by a concrete use, behavior,
+ configuration and interconnection of resources that describe a single
+ experiment case (We call this the experiment description).
+ A same experiment (scenario) can be run many times.
+
The ExperimentController (EC), is the entity responsible for
- managing a single experiment.
+ managing an experiment instance (run). The same scenario can be
+ recreated (and re-run) by instantiating an EC and recreating
+ the same experiment description.
+
+ In NEPI, an experiment is represented as a graph of interconnected
+ resources. A resource is a generic concept in the sense that any
+ component taking part of an experiment, whether physical of
+ virtual, is considered a resource. A resources could be a host,
+ a virtual machine, an application, a simulator, a IP address.
+
+ A ResourceManager (RM), is the entity responsible for managing a
+ single resource. ResourceManagers are specific to a resource
+ type (i.e. An RM to control a Linux application will not be
+ the same as the RM used to control a ns-3 simulation).
+ In order for a new type of resource to be supported in NEPI
+ a new RM must be implemented. NEPI already provides different
+ RMs to control basic resources, and new can be extended from
+ the existing ones.
+
Through the EC interface the user can create ResourceManagers (RMs),
- configure them and interconnect them, in order to describe the experiment.
-
- Only when the 'deploy()' method is invoked, the EC will take actions
- to transform the 'described' experiment into a 'running' experiment.
+ configure them and interconnect them, in order to describe an experiment.
+ Describing an experiment through the EC does not run the experiment.
+ Only when the 'deploy()' method is invoked on the EC, will the EC take
+ actions to transform the 'described' experiment into a 'running' experiment.
While the experiment is running, it is possible to continue to
create/configure/connect RMs, and to deploy them to involve new
- resources in the experiment.
-
+ resources in the experiment (this is known as 'interactive' deployment).
+
+ An experiments in NEPI is identified by a string id,
+ which is either given by the user, or automatically generated by NEPI.
+ The purpose of this identifier is to separate files and results that
+ belong to different experiment scenarios.
+ However, since a same 'experiment' can be run many times, the experiment
+ id is not enough to identify an experiment instance (run).
+ For this reason, the ExperimentController has two identifier, the
+ exp_id, which can be re-used by different ExperimentController instances,
+ and the run_id, which unique to a ExperimentController instance, and
+ is automatically generated by NEPI.
+
"""
- def __init__(self, exp_id = None, root_dir = "/tmp"):
+ def __init__(self, exp_id = None):
super(ExperimentController, self).__init__()
# root directory to store files
- self._root_dir = root_dir
- # experiment identifier given by the user
- self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
+ # Run identifier. It identifies a concrete instance (run) of an experiment.
+ # Since a same experiment (same configuration) can be run many times,
+ # this id permits to identify concrete exoeriment run
+ self._run_id = tsformat()
+
+ # Experiment identifier. Usually assigned by the user
+ self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
# generator of globally unique ids
self._guid_generator = guid.GuidGenerator()
@property
def exp_id(self):
- """ Return the experiment ID
+ """ Return the experiment id assigned by the user
"""
- exp_id = self._exp_id
- if not exp_id.startswith("nepi-"):
- exp_id = "nepi-" + exp_id
- return exp_id
+ return self._exp_id
+
+ @property
+ def run_id(self):
+ """ Return the experiment instance (run) identifier
+
+ """
+ return self._run_id
@property
def finished(self):
:param guids: List of guids
:type guids: list
"""
- return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
+ return self.wait(guids, states = [ResourceState.STARTED,
+ ResourceState.STOPPED,
+ ResourceState.FINISHED])
- def wait(self, guids, states = [ResourceState.FINISHED]):
+ def wait(self, guids, states = [ResourceState.FINISHED,
+ ResourceState.STOPPED]):
""" Blocking method that waits until all the RM from the 'guid' list
reached state 'state' or until a failure occurs
while not all([self.state(guid) in states for guid in guids]) and \
not any([self.state(guid) in [
- ResourceState.STOPPED,
ResourceState.FAILED] for guid in guids]) and \
not self.finished:
+ # debug logging
+ waited = ""
+ for guid in guids:
+ waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True))
+ self.logger.debug(" WAITING FOR %s " % waited )
+
# We keep the sleep big to decrease the number of RM state queries
time.sleep(2)
:param tid: Id of the task
:type tid: int
- :rtype: unknow
+ :rtype: Task
"""
return self._tasks.get(tid)
:param guid: Id of the task
:type guid: int
- :rtype: ResourceManager
+ :rtype: ResourceManager
"""
return self._resources.get(guid)
rm1 = self.get_resource(guid1)
rm2 = self.get_resource(guid2)
- rm1.connect(guid2)
- rm2.connect(guid1)
+ rm1.register_connection(guid2)
+ rm2.register_connection(guid1)
def register_condition(self, group1, action, group2, state,
time = None):
"""
rm = self.get_resource(guid)
+ state = rm.state
+
if hr:
- return ResourceState2str.get(rm.state)
+ return ResourceState2str.get(state)
- return rm.state
+ return state
def stop(self, guid):
""" Stop a specific RM defined by its 'guid'
# same conditions (e.g. LinuxApplications running on a same
# node share a single lock, so they will tend to be serialized).
# If we disorder the group list, this problem can be mitigated.
- #random.shuffle(group)
+ random.shuffle(group)
def wait_all_and_start(group):
reschedule = False
for guid in group:
- rm = self.get_resource(guid)
- if rm.state < ResourceState.READY:
+ if self.state(guid) < ResourceState.READY:
reschedule = True
break
# schedule a stop. Otherwise the RM will stop immediately
self.schedule("2s", rm.stop_with_conditions)
-
def release(self, group = None):
""" Release the elements of the list 'group' or
all the resources if any group is specified
:return : The Id of the task
"""
- timestamp = strfvalid(date)
-
+ timestamp = stabsformat(date)
task = Task(timestamp, callback)
task = self._scheduler.schedule(task)
if track:
self._tasks[task.id] = task
-
+
# Notify condition to wake up the processing thread
self._notify()
def _process(self):
""" Process scheduled tasks.
+ .. note::
+
The _process method is executed in an independent thread held by the
ExperimentController for as long as the experiment is running.
try:
while not self.finished:
self._cond.acquire()
+
task = self._scheduler.next()
- self._cond.release()
if not task:
- # It there are not tasks in the tasks queue we need to
- # wait until a call to schedule wakes us up
- self._cond.acquire()
+ # No task to execute. Wait for a new task to be scheduled.
self._cond.wait()
- self._cond.release()
- else:
- # If the task timestamp is in the future the thread needs to wait
- # until time elapse or until another task is scheduled
- now = strfnow()
+ else:
+ # The task timestamp is in the future. Wait for timeout
+ # or until another task is scheduled.
+ now = tnow()
if now < task.timestamp:
- # Calculate time difference in seconds
- timeout = strfdiff(task.timestamp, now)
+ # Calculate timeout in seconds
+ timeout = tdiffsec(task.timestamp, now)
+
# Re-schedule task with the same timestamp
self._scheduler.schedule(task)
- # Sleep until timeout or until a new task awakes the condition
- self._cond.acquire()
+
+ task = None
+
+ # Wait timeout or until a new task awakes the condition
self._cond.wait(timeout)
- self._cond.release()
- else:
- # Process tasks in parallel
- runner.put(self._execute, task)
+
+ self._cond.release()
+
+ if task:
+ # Process tasks in parallel
+ runner.put(self._execute, task)
except:
import traceback
err = traceback.format_exc()
- self._logger.error("Error while processing tasks in the EC: %s" % err)
+ self.logger.error("Error while processing tasks in the EC: %s" % err)
self._state = ECState.FAILED
finally:
- self._logger.info("Exiting the task processing loop ... ")
+ self.logger.debug("Exiting the task processing loop ... ")
runner.sync()
def _execute(self, task):
""" Executes a single task.
- If the invokation of the task callback raises an
- exception, the processing thread of the ExperimentController
- will be stopped and the experiment will be aborted.
-
:param task: Object containing the callback to execute
:type task: Task
+ .. note::
+
+ If the invokation of the task callback raises an
+ exception, the processing thread of the ExperimentController
+ will be stopped and the experiment will be aborted.
+
"""
# Invoke callback
task.status = TaskStatus.DONE
task.result = err
task.status = TaskStatus.ERROR
- self._logger.error("Error occurred while executing task: %s" % err)
+ self.logger.error("Error occurred while executing task: %s" % err)
# Set the EC to FAILED state (this will force to exit the task
# processing thread)