"""
.. class:: Class Args :
- :param exp_id: Id of the experiment
+ :param exp_id: Human readable identifier for the experiment.
+ It will be used in the name of the directory
+ where experiment related information is stored
:type exp_id: int
- :param root_dir: Root directory of the experiment
+
+ :param root_dir: Root directory where experiment specific folder
+ will be created to store experiment information
:type root_dir: str
.. note::
- This class is the only one used by the User. Indeed, the user "talks"
- only with the Experiment Controller and this latter forward to
- the different Resources Manager the order provided by the user.
+ The ExperimentController (EC), is the entity responsible for
+ managing a single experiment.
+ Through the EC interface the user can create ResourceManagers (RMs),
+ configure them and interconnect them, in order to describe the experiment.
+
+ Only when the 'deploy()' method is invoked, the EC will take actions
+ to transform the 'described' experiment into a 'running' experiment.
+
+ While the experiment is running, it is possible to continue to
+ create/configure/connect RMs, and to deploy them to involve new
+ resources in the experiment.
"""
def wait_finished(self, guids):
""" Blocking method that wait until all the RM from the 'guid' list
- reach the state FINISHED
+ reached the state FINISHED
+
+ :param guids: List of guids
+ :type guids: list
+ """
+ return self.wait(guids)
+
+ def wait_started(self, guids):
+ """ Blocking method that wait until all the RM from the 'guid' list
+ reached the state STARTED
+ :param guids: List of guids
+ :type guids: list
+ """
+ return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
+
+ def wait(self, guids, states = [ResourceState.FINISHED]):
+ """ Blocking method that waits until all the RM from the 'guid' list
+ reached state 'state' or until a failure occurs
+
:param guids: List of guids
:type guids: list
"""
if isinstance(guids, int):
guids = [guids]
- while not all([self.state(guid) in [ResourceState.FINISHED,
- ResourceState.STOPPED,
- ResourceState.FAILED] \
- for guid in guids]) and not self.finished:
- # We keep the sleep as large as possible to
- # decrese the number of RM state requests
+ while not all([self.state(guid) in states for guid in guids]) and \
+ not any([self.state(guid) in [
+ ResourceState.STOPPED,
+ ResourceState.FAILED] for guid in guids]) and \
+ not self.finished:
+ # We keep the sleep big to decrease the number of RM state queries
time.sleep(2)
-
+
def get_task(self, tid):
""" Get a specific task
self.logger.debug(" ------- DEPLOY START ------ ")
if not group:
- group = self.resources
-
+ # By default, if not deployment group is indicated,
+ # all RMs that are undeployed will be deployed
+ group = []
+ for guid in self.resources:
+ if self.state(guid) == ResourceState.NEW:
+ group.append(guid)
+
if isinstance(group, int):
group = [group]
ExperimentController for as long as the experiment is running.
Tasks are scheduled by invoking the schedule method with a target callback.
- The schedule method is givedn a execution time which controls the
+ The schedule method is given a execution time which controls the
order in which tasks are processed.
Tasks are processed in parallel using multithreading.
The environmental variable NEPI_NTHREADS can be used to control
the number of threads used to process tasks. The default value is 50.
+ Exception handling:
+
+ To execute tasks in parallel, an ParallelRunner (PR) object, holding
+ a pool of threads (workers), is used.
+ For each available thread in the PR, the next task popped from
+ the scheduler queue is 'put' in the PR.
+ Upon receiving a task to execute, each PR worker (thread) invokes the
+ _execute method of the EC, passing the task as argument.
+ This method, calls task.callback inside a try/except block. If an
+ exception is raised by the tasks.callback, it will be trapped by the
+ try block, logged to standard error (usually the console), and the EC
+ state will be set to ECState.FAILED.
+ The invocation of _notify immediately after, forces the processing
+ loop in the _process method, to wake up if it was blocked waiting for new
+ tasks to arrived, and to check the EC state.
+ As the EC is in FAILED state, the processing loop exits and the
+ 'finally' block is invoked. In the 'finally' block, the 'sync' method
+ of the PR is invoked, which forces the PR to raise any unchecked errors
+ that might have been raised by the workers.
+
"""
nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
self._state = ECState.FAILED
finally:
+ self._logger.info("Exiting the task processing loop ... ")
runner.sync()
def _execute(self, task):