+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import functools
import logging
import os
import random
from nepi.util.parallel import ParallelRun
from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
from nepi.execution.resource import ResourceFactory, ResourceAction, \
- ResourceState
+ ResourceState, ResourceState2str
from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
from nepi.execution.trace import TraceAttr
# TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
class ECState(object):
+ """ State of the Experiment Controller
+
+ """
RUNNING = 1
FAILED = 2
TERMINATED = 3
class ExperimentController(object):
+ """
+ .. class:: Class Args :
+
+ :param exp_id: Id of the experiment
+ :type exp_id: int
+ :param root_dir: Root directory of the experiment
+ :type root_dir: str
+
+ .. note::
+
+ This class is the only one used by the User. Indeed, the user "talks"
+ only with the Experiment Controller and this latter forward to
+ the different Resources Manager the order provided by the user.
+
+ """
+
def __init__(self, exp_id = None, root_dir = "/tmp"):
super(ExperimentController, self).__init__()
# root directory to store files
@property
def logger(self):
+ """ Return the logger of the Experiment Controller
+
+ """
return self._logger
@property
def ecstate(self):
+ """ Return the state of the Experiment Controller
+
+ """
return self._state
@property
def exp_id(self):
+ """ Return the experiment ID
+
+ """
exp_id = self._exp_id
if not exp_id.startswith("nepi-"):
exp_id = "nepi-" + exp_id
@property
def finished(self):
+ """ Put the state of the Experiment Controller into a final state :
+ Either TERMINATED or FAILED
+
+ """
return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
def wait_finished(self, guids):
- while not all([self.state(guid) == ResourceState.FINISHED \
+ """ Blocking method that wait until all the RM from the 'guid' list
+ reach the state FINISHED
+
+ :param guids: List of guids
+ :type guids: list
+ """
+ if isinstance(guids, int):
+ guids = [guids]
+
+ while not all([self.state(guid) in [ResourceState.FINISHED,
+ ResourceState.STOPPED,
+ ResourceState.FAILED] \
for guid in guids]) and not self.finished:
# We keep the sleep as large as possible to
# decrese the number of RM state requests
time.sleep(2)
def get_task(self, tid):
+ """ Get a specific task
+
+ :param tid: Id of the task
+ :type tid: int
+ :rtype: unknow
+ """
return self._tasks.get(tid)
def get_resource(self, guid):
+ """ Get a specific Resource Manager
+
+ :param guid: Id of the task
+ :type guid: int
+ :rtype: ResourceManager
+ """
return self._resources.get(guid)
@property
def resources(self):
+ """ Returns the list of all the Resource Manager Id
+
+ :rtype: set
+ """
return self._resources.keys()
def register_resource(self, rtype, guid = None):
+ """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
+ for the RM of type 'rtype' and add it to the list of Resources.
+
+ :param rtype: Type of the RM
+ :type rtype: str
+ :return : Id of the RM
+ :rtype: int
+ """
# Get next available guid
guid = self._guid_generator.next(guid)
return guid
def get_attributes(self, guid):
- rm = self.get_resource(guid)
- return rm.get_attributes()
+ """ Return all the attibutes of a specific RM
- def get_filters(self, guid):
+ :param guid: Guid of the RM
+ :type guid: int
+ :return : List of attributes
+ :rtype: list
+ """
rm = self.get_resource(guid)
- return rm.get_filters()
+ return rm.get_attributes()
def register_connection(self, guid1, guid2):
+ """ Registers a guid1 with a guid2.
+ The declaration order is not important
+
+ :param guid1: First guid to connect
+ :type guid1: ResourceManager
+
+ :param guid2: Second guid to connect
+ :type guid: ResourceManager
+
+ """
rm1 = self.get_resource(guid1)
rm2 = self.get_resource(guid2)
rm = self.get_resource(guid)
return rm.trace(name, attr, block, offset)
- def discover(self, guid, filters):
+ def discover(self, guid):
+ """ Discover a specific RM defined by its 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
- return rm.discover(filters)
+ return rm.discover()
+
+ def provision(self, guid):
+ """ Provision a specific RM defined by its 'guid'
- def provision(self, guid, filters):
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
- return rm.provision(filters)
+ return rm.provision()
def get(self, guid, name):
+ """ Get a specific attribute 'name' from the RM 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :param name: attribute's name
+ :type name: str
+
+ """
rm = self.get_resource(guid)
return rm.get(name)
def set(self, guid, name, value):
+ """ Set a specific attribute 'name' from the RM 'guid'
+ with the value 'value'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :param name: attribute's name
+ :type name: str
+
+ :param value: attribute's value
+
+ """
rm = self.get_resource(guid)
return rm.set(name, value)
- def state(self, guid):
+ def state(self, guid, hr = False):
+ """ Returns the state of a resource
+
+ :param guid: Resource guid
+ :type guid: integer
+
+ :param hr: Human readable. Forces return of a
+ status string instead of a number
+ :type hr: boolean
+
+ """
rm = self.get_resource(guid)
+ if hr:
+ return ResourceState2str.get(rm.state)
+
return rm.state
def stop(self, guid):
+ """ Stop a specific RM defined by its 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.stop()
def start(self, guid):
+ """ Start a specific RM defined by its 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.start()
rm.set_with_conditions(name, value, group2, state, time)
def stop_with_conditions(self, guid):
+ """ Stop a specific RM defined by its 'guid' only if all the conditions are true
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.stop_with_conditions()
def start_with_conditions(self, guid):
+ """ Start a specific RM defined by its 'guid' only if all the conditions are true
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.start_with_condition()
"""
self.logger.debug(" ------- DEPLOY START ------ ")
- stop = []
-
- def steps(rm):
- try:
- rm.deploy()
- rm.start_with_conditions()
-
- # Only if the RM has STOP conditions we
- # schedule a stop. Otherwise the RM will stop immediately
- if rm.conditions.get(ResourceAction.STOP):
- rm.stop_with_conditions()
- except:
- import traceback
- err = traceback.format_exc()
-
- self._logger.error("Error occurred while deploying resources: %s" % err)
-
- # stop deployment
- stop.append(None)
-
if not group:
group = self.resources
+ if isinstance(group, int):
+ group = [group]
+
# Before starting deployment we disorder the group list with the
# purpose of speeding up the whole deployment process.
# It is likely that the user inserted in the 'group' list closely
- # resources resources one after another (e.g. all applications
+ # resources one after another (e.g. all applications
# connected to the same node can likely appear one after another).
# This can originate a slow down in the deployment since the N
# threads the parallel runner uses to processes tasks may all
# be taken up by the same family of resources waiting for the
- # same conditions.
- # If we disorder the group list, this problem can be mitigated
- random.shuffle(group)
+ # same conditions (e.g. LinuxApplications running on a same
+ # node share a single lock, so they will tend to be serialized).
+ # If we disorder the group list, this problem can be mitigated.
+ #random.shuffle(group)
+
+ def wait_all_and_start(group):
+ reschedule = False
+ for guid in group:
+ rm = self.get_resource(guid)
+ if rm.state < ResourceState.READY:
+ reschedule = True
+ break
+
+ if reschedule:
+ callback = functools.partial(wait_all_and_start, group)
+ self.schedule("1s", callback)
+ else:
+ # If all resources are read, we schedule the start
+ for guid in group:
+ rm = self.get_resource(guid)
+ self.schedule("0s", rm.start_with_conditions)
+
+ if wait_all_ready:
+ # Schedule the function that will check all resources are
+ # READY, and only then it will schedule the start.
+ # This is aimed to reduce the number of tasks looping in the scheduler.
+ # Intead of having N start tasks, we will have only one
+ callback = functools.partial(wait_all_and_start, group)
+ self.schedule("1s", callback)
- threads = []
for guid in group:
rm = self.get_resource(guid)
+ self.schedule("0s", rm.deploy)
- if wait_all_ready:
- towait = list(group)
- towait.remove(guid)
- self.register_condition(guid, ResourceAction.START,
- towait, ResourceState.READY)
+ if not wait_all_ready:
+ self.schedule("1s", rm.start_with_conditions)
- thread = threading.Thread(target = steps, args = (rm,))
- threads.append(thread)
- thread.setDaemon(True)
- thread.start()
-
- while list(threads) and not self.finished and not stop:
- thread = threads[0]
- # Time out after 5 seconds to check EC not terminated
- thread.join(1)
- if not thread.is_alive():
- threads.remove(thread)
+ if rm.conditions.get(ResourceAction.STOP):
+ # Only if the RM has STOP conditions we
+ # schedule a stop. Otherwise the RM will stop immediately
+ self.schedule("2s", rm.stop_with_conditions)
- if stop:
- # stop the scheduler
- self._stop_scheduler()
- if self._thread.is_alive():
- self._thread.join()
+ def release(self, group = None):
+ """ Release the elements of the list 'group' or
+ all the resources if any group is specified
- raise RuntimeError, "Error occurred, interrupting deployment "
+ :param group: List of RM
+ :type group: list
- def release(self, group = None):
+ """
if not group:
group = self.resources
thread.join(5)
if not thread.is_alive():
threads.remove(thread)
-
+
def shutdown(self):
+ """ Shutdown the Experiment Controller.
+ Releases all the resources and stops task processing thread
+
+ """
self.release()
- self._stop_scheduler()
+ # Mark the EC state as TERMINATED
+ self._state = ECState.TERMINATED
+
+ # Notify condition to wake up the processing thread
+ self._notify()
if self._thread.is_alive():
self._thread.join()
def schedule(self, date, callback, track = False):
""" Schedule a callback to be executed at time date.
- date string containing execution time for the task.
+ :param date: string containing execution time for the task.
It can be expressed as an absolute time, using
timestamp format, or as a relative time matching
^\d+.\d+(h|m|s|ms|us)$
- callback code to be executed for the task. Must be a
+ :param callback: code to be executed for the task. Must be a
Python function, and receives args and kwargs
as arguments.
- track if set to True, the task will be retrivable with
+ :param track: if set to True, the task will be retrivable with
the get_task() method
+
+ :return : The Id of the task
"""
timestamp = strfvalid(date)
self._tasks[task.id] = task
# Notify condition to wake up the processing thread
- self._cond.acquire()
- self._cond.notify()
- self._cond.release()
+ self._notify()
return task.id
def _process(self):
- runner = ParallelRun(maxthreads = 50)
+ """ Process scheduled tasks.
+
+ The _process method is executed in an independent thread held by the
+ ExperimentController for as long as the experiment is running.
+
+ Tasks are scheduled by invoking the schedule method with a target callback.
+ The schedule method is givedn a execution time which controls the
+ order in which tasks are processed.
+
+ Tasks are processed in parallel using multithreading.
+ The environmental variable NEPI_NTHREADS can be used to control
+ the number of threads used to process tasks. The default value is 50.
+
+ """
+ nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
+
+ runner = ParallelRun(maxthreads = nthreads)
runner.start()
try:
self._cond.acquire()
task = self._scheduler.next()
self._cond.release()
-
+
if not task:
# It there are not tasks in the tasks queue we need to
# wait until a call to schedule wakes us up
else:
# Process tasks in parallel
runner.put(self._execute, task)
-
except:
import traceback
err = traceback.format_exc()
self._logger.error("Error while processing tasks in the EC: %s" % err)
self._state = ECState.FAILED
- finally:
+ finally:
runner.sync()
-
- # Mark EC state as terminated
- if self.ecstate == ECState.RUNNING:
- self._state = ECState.TERMINATED
def _execute(self, task):
+ """ Executes a single task.
+
+ If the invokation of the task callback raises an
+ exception, the processing thread of the ExperimentController
+ will be stopped and the experiment will be aborted.
+
+ :param task: Object containing the callback to execute
+ :type task: Task
+
+ """
# Invoke callback
task.status = TaskStatus.DONE
self._logger.error("Error occurred while executing task: %s" % err)
- self._stop_scheduler()
+ # Set the EC to FAILED state (this will force to exit the task
+ # processing thread)
+ self._state = ECState.FAILED
+
+ # Notify condition to wake up the processing thread
+ self._notify()
# Propage error to the ParallelRunner
raise
- def _stop_scheduler(self):
- # Mark the EC as failed
- self._state = ECState.FAILED
-
- # Wake up the EC in case it was sleeping
+ def _notify(self):
+ """ Awakes the processing thread in case it is blocked waiting
+ for a new task to be scheduled.
+ """
self._cond.acquire()
self._cond.notify()
self._cond.release()
-