# TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
class ECState(object):
+ """ State of the Experiment Controller
+
+ """
RUNNING = 1
FAILED = 2
TERMINATED = 3
class ExperimentController(object):
+ """
+ .. class:: Class Args :
+
+ :param exp_id: Id of the experiment
+ :type exp_id: int
+ :param root_dir: Root directory of the experiment
+ :type root_dir: str
+
+ .. note::
+
+ This class is the only one used by the User. Indeed, the user "talks"
+ only with the Experiment Controller and this latter forward to
+ the different Resources Manager the order provided by the user.
+
+ """
+
def __init__(self, exp_id = None, root_dir = "/tmp"):
super(ExperimentController, self).__init__()
# root directory to store files
@property
def logger(self):
+ """ Return the logger of the Experiment Controller
+
+ """
return self._logger
@property
def ecstate(self):
+ """ Return the state of the Experiment Controller
+
+ """
return self._state
@property
def exp_id(self):
+ """ Return the experiment ID
+
+ """
exp_id = self._exp_id
if not exp_id.startswith("nepi-"):
exp_id = "nepi-" + exp_id
@property
def finished(self):
+ """ Put the state of the Experiment Controller into a final state :
+ Either TERMINATED or FAILED
+
+ """
return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
def wait_finished(self, guids):
- # Take into account if only one guids is given in parameter
+ """ Blocking method that wait until all the RM from the 'guid' list
+ reach the state FINISHED
+
+ :param guids: List of guids
+ :type guids: list
+ """
+ if isinstance(guids, int):
+ guids = [guids]
+
while not all([self.state(guid) in [ResourceState.FINISHED,
ResourceState.STOPPED,
ResourceState.FAILED] \
time.sleep(2)
def get_task(self, tid):
+ """ Get a specific task
+
+ :param tid: Id of the task
+ :type tid: int
+ :rtype: unknow
+ """
return self._tasks.get(tid)
def get_resource(self, guid):
+ """ Get a specific Resource Manager
+
+ :param guid: Id of the task
+ :type guid: int
+ :rtype: ResourceManager
+ """
return self._resources.get(guid)
@property
def resources(self):
+ """ Returns the list of all the Resource Manager Id
+
+ :rtype: set
+ """
return self._resources.keys()
def register_resource(self, rtype, guid = None):
+ """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
+ for the RM of type 'rtype' and add it to the list of Resources.
+
+ :param rtype: Type of the RM
+ :type rtype: str
+ :return : Id of the RM
+ :rtype: int
+ """
# Get next available guid
guid = self._guid_generator.next(guid)
return guid
def get_attributes(self, guid):
+ """ Return all the attibutes of a specific RM
+
+ :param guid: Guid of the RM
+ :type guid: int
+ :return : List of attributes
+ :rtype: list
+ """
rm = self.get_resource(guid)
return rm.get_attributes()
def register_connection(self, guid1, guid2):
+ """ Registers a guid1 with a guid2.
+ The declaration order is not important
+
+ :param guid1: First guid to connect
+ :type guid1: ResourceManager
+
+ :param guid2: Second guid to connect
+ :type guid: ResourceManager
+
+ """
rm1 = self.get_resource(guid1)
rm2 = self.get_resource(guid2)
return rm.trace(name, attr, block, offset)
def discover(self, guid):
+ """ Discover a specific RM defined by its 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.discover()
def provision(self, guid):
+ """ Provision a specific RM defined by its 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.provision()
def get(self, guid, name):
+ """ Get a specific attribute 'name' from the RM 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :param name: attribute's name
+ :type name: str
+
+ """
rm = self.get_resource(guid)
return rm.get(name)
def set(self, guid, name, value):
+ """ Set a specific attribute 'name' from the RM 'guid'
+ with the value 'value'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :param name: attribute's name
+ :type name: str
+
+ :param value: attribute's value
+
+ """
rm = self.get_resource(guid)
return rm.set(name, value)
return rm.state
def stop(self, guid):
+ """ Stop a specific RM defined by its 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.stop()
def start(self, guid):
+ """ Start a specific RM defined by its 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.start()
rm.set_with_conditions(name, value, group2, state, time)
def stop_with_conditions(self, guid):
+ """ Stop a specific RM defined by its 'guid' only if all the conditions are true
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.stop_with_conditions()
def start_with_conditions(self, guid):
+ """ Start a specific RM defined by its 'guid' only if all the conditions are true
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.start_with_condition()
def release(self, group = None):
+ """ Release the elements of the list 'group' or
+ all the resources if any group is specified
+
+ :param group: List of RM
+ :type group: list
+
+ """
if not group:
group = self.resources
threads.remove(thread)
def shutdown(self):
+ """ Shutdown the Experiment Controller.
+ It means : Release all the resources and stop the scheduler
+
+ """
self.release()
self._stop_scheduler()
def schedule(self, date, callback, track = False):
""" Schedule a callback to be executed at time date.
- date string containing execution time for the task.
+ :param date: string containing execution time for the task.
It can be expressed as an absolute time, using
timestamp format, or as a relative time matching
^\d+.\d+(h|m|s|ms|us)$
- callback code to be executed for the task. Must be a
+ :param callback: code to be executed for the task. Must be a
Python function, and receives args and kwargs
as arguments.
- track if set to True, the task will be retrivable with
+ :param track: if set to True, the task will be retrivable with
the get_task() method
+
+ :return : The Id of the task
"""
timestamp = strfvalid(date)
return task.id
def _process(self):
+ """ Process at executing the task that are in the scheduler.
+
+ """
+
runner = ParallelRun(maxthreads = 50)
runner.start()
self._state = ECState.TERMINATED
def _execute(self, task):
+ """ Invoke the callback of the task 'task'
+
+ :param task: Id of the task
+ :type task: int
+
+ """
# Invoke callback
task.status = TaskStatus.DONE
raise
def _stop_scheduler(self):
+ """ Stop the scheduler and put the EC into a FAILED State.
+
+ """
+
# Mark the EC as failed
self._state = ECState.FAILED