"""
.. class:: Class Args :
- :param exp_id: Id of the experiment
+ :param exp_id: Human readable identifier for the experiment.
+ It will be used in the name of the directory
+ where experiment related information is stored
:type exp_id: int
- :param root_dir: Root directory of the experiment
+
+ :param root_dir: Root directory where experiment specific folder
+ will be created to store experiment information
:type root_dir: str
.. note::
+ The ExperimentController (EC), is the entity responsible for
+ managing a single experiment.
+ Through the EC interface the user can create ResourceManagers (RMs),
+ configure them and interconnect them, in order to describe the experiment.
+
+ Only when the 'deploy()' method is invoked, the EC will take actions
+ to transform the 'described' experiment into a 'running' experiment.
- This class is the only one used by the User. Indeed, the user "talks"
- only with the Experiment Controller and this latter forward to
- the different Resources Manager the order provided by the user.
+ While the experiment is running, it is possible to continue to
+ create/configure/connect RMs, and to deploy them to involve new
+ resources in the experiment.
"""
def wait_finished(self, guids):
""" Blocking method that wait until all the RM from the 'guid' list
- reach the state FINISHED
+ reached the state FINISHED
+ :param guids: List of guids
+ :type guids: list
+ """
+ return self.wait(guids)
+
+ def wait_started(self, guids):
+ """ Blocking method that wait until all the RM from the 'guid' list
+ reached the state STARTED
+
+ :param guids: List of guids
+ :type guids: list
+ """
+ return self.wait(guids, states = [ResourceState.STARTED,
+ ResourceState.STOPPED,
+ ResourceState.FINISHED])
+
+ def wait(self, guids, states = [ResourceState.FINISHED,
+ ResourceState.STOPPED]):
+ """ Blocking method that waits until all the RM from the 'guid' list
+ reached state 'state' or until a failure occurs
+
:param guids: List of guids
:type guids: list
"""
if isinstance(guids, int):
guids = [guids]
- while not all([self.state(guid) in [ResourceState.FINISHED,
- ResourceState.STOPPED,
- ResourceState.FAILED] \
- for guid in guids]) and not self.finished:
- # We keep the sleep as large as possible to
- # decrese the number of RM state requests
+ while not all([self.state(guid) in states for guid in guids]) and \
+ not any([self.state(guid) in [
+ ResourceState.FAILED] for guid in guids]) and \
+ not self.finished:
+ # debug logging
+ waited = ""
+ for guid in guids:
+ waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True))
+ self.logger.debug(" WAITING FOR %s " % waited )
+
+ # We keep the sleep big to decrease the number of RM state queries
time.sleep(2)
-
+
def get_task(self, tid):
""" Get a specific task
def resources(self):
""" Returns the list of all the Resource Manager Id
- :rtype: set
+ :rtype: set
+
"""
return self._resources.keys()
:param rtype: Type of the RM
:type rtype: str
- :return : Id of the RM
- :rtype: int
+ :return: Id of the RM
+ :rtype: int
"""
# Get next available guid
guid = self._guid_generator.next(guid)
:param guid: Guid of the RM
:type guid: int
- :return : List of attributes
+ :return: List of attributes
:rtype: list
"""
rm = self.get_resource(guid)
:param guid2: Second guid to connect
:type guid: ResourceManager
-
"""
rm1 = self.get_resource(guid1)
rm2 = self.get_resource(guid2)
"""
rm = self.get_resource(guid)
+ state = rm.state
+
if hr:
- return ResourceState2str.get(rm.state)
+ return ResourceState2str.get(state)
- return rm.state
+ return state
def stop(self, guid):
""" Stop a specific RM defined by its 'guid'
self.logger.debug(" ------- DEPLOY START ------ ")
if not group:
- group = self.resources
-
+ # By default, if not deployment group is indicated,
+ # all RMs that are undeployed will be deployed
+ group = []
+ for guid in self.resources:
+ if self.state(guid) == ResourceState.NEW:
+ group.append(guid)
+
if isinstance(group, int):
group = [group]
# same conditions (e.g. LinuxApplications running on a same
# node share a single lock, so they will tend to be serialized).
# If we disorder the group list, this problem can be mitigated.
- #random.shuffle(group)
+ random.shuffle(group)
def wait_all_and_start(group):
reschedule = False
for guid in group:
- rm = self.get_resource(guid)
- if rm.state < ResourceState.READY:
+ if self.state(guid) < ResourceState.READY:
reschedule = True
break
# schedule a stop. Otherwise the RM will stop immediately
self.schedule("2s", rm.stop_with_conditions)
-
def release(self, group = None):
""" Release the elements of the list 'group' or
all the resources if any group is specified
if track:
self._tasks[task.id] = task
-
+
# Notify condition to wake up the processing thread
self._notify()
def _process(self):
""" Process scheduled tasks.
+ .. note::
+
The _process method is executed in an independent thread held by the
ExperimentController for as long as the experiment is running.
Tasks are scheduled by invoking the schedule method with a target callback.
- The schedule method is givedn a execution time which controls the
+ The schedule method is given a execution time which controls the
order in which tasks are processed.
Tasks are processed in parallel using multithreading.
The environmental variable NEPI_NTHREADS can be used to control
the number of threads used to process tasks. The default value is 50.
+ Exception handling:
+
+ To execute tasks in parallel, an ParallelRunner (PR) object, holding
+ a pool of threads (workers), is used.
+ For each available thread in the PR, the next task popped from
+ the scheduler queue is 'put' in the PR.
+ Upon receiving a task to execute, each PR worker (thread) invokes the
+ _execute method of the EC, passing the task as argument.
+ This method, calls task.callback inside a try/except block. If an
+ exception is raised by the tasks.callback, it will be trapped by the
+ try block, logged to standard error (usually the console), and the EC
+ state will be set to ECState.FAILED.
+ The invocation of _notify immediately after, forces the processing
+ loop in the _process method, to wake up if it was blocked waiting for new
+ tasks to arrived, and to check the EC state.
+ As the EC is in FAILED state, the processing loop exits and the
+ 'finally' block is invoked. In the 'finally' block, the 'sync' method
+ of the PR is invoked, which forces the PR to raise any unchecked errors
+ that might have been raised by the workers.
+
"""
nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
try:
while not self.finished:
self._cond.acquire()
+
task = self._scheduler.next()
- self._cond.release()
if not task:
- # It there are not tasks in the tasks queue we need to
- # wait until a call to schedule wakes us up
- self._cond.acquire()
+ # No task to execute. Wait for a new task to be scheduled.
self._cond.wait()
- self._cond.release()
- else:
- # If the task timestamp is in the future the thread needs to wait
- # until time elapse or until another task is scheduled
+ else:
+ # The task timestamp is in the future. Wait for timeout
+ # or until another task is scheduled.
now = strfnow()
if now < task.timestamp:
- # Calculate time difference in seconds
+ # Calculate timeout in seconds
timeout = strfdiff(task.timestamp, now)
+
# Re-schedule task with the same timestamp
self._scheduler.schedule(task)
- # Sleep until timeout or until a new task awakes the condition
- self._cond.acquire()
+
+ task = None
+
+ # Wait timeout or until a new task awakes the condition
self._cond.wait(timeout)
- self._cond.release()
- else:
- # Process tasks in parallel
- runner.put(self._execute, task)
+
+ self._cond.release()
+
+ if task:
+ # Process tasks in parallel
+ runner.put(self._execute, task)
except:
import traceback
err = traceback.format_exc()
- self._logger.error("Error while processing tasks in the EC: %s" % err)
+ self.logger.error("Error while processing tasks in the EC: %s" % err)
self._state = ECState.FAILED
finally:
+ self.logger.debug("Exiting the task processing loop ... ")
runner.sync()
def _execute(self, task):
""" Executes a single task.
- If the invokation of the task callback raises an
- exception, the processing thread of the ExperimentController
- will be stopped and the experiment will be aborted.
-
:param task: Object containing the callback to execute
:type task: Task
+ .. note::
+
+ If the invokation of the task callback raises an
+ exception, the processing thread of the ExperimentController
+ will be stopped and the experiment will be aborted.
+
"""
# Invoke callback
task.status = TaskStatus.DONE
task.result = err
task.status = TaskStatus.ERROR
- self._logger.error("Error occurred while executing task: %s" % err)
+ self.logger.error("Error occurred while executing task: %s" % err)
# Set the EC to FAILED state (this will force to exit the task
# processing thread)