### Attribute Flags
class Flags:
+ """ Differents flags to characterize an attribute
+
+ """
# Attribute can be modified by the user
NoFlags = 0x00
# Attribute is not modifiable by the user
Filter = 0x08
class Attribute(object):
+ """
+ .. class:: Class Args :
+
+ :param name: Name of the attribute
+ :type name: str
+ :param help: Help about the attribute
+ :type help: str
+ :param type: type of the attribute
+ :type type: str
+ :param flags: Help about the attribute
+ :type flags: str
+ :param default: Default value of the attribute
+ :type default: str
+ :param allowed: Allowed value for this attribute
+ :type allowed: str
+ :param range: Range of the attribute
+ :type range: str
+ :param set_hook: hook that is related with this attribute
+ :type set_hook: str
+
+ """
def __init__(self, name, help, type = Types.String,
flags = Flags.NoFlags, default = None, allowed = None,
range = None, set_hook = None):
@property
def name(self):
+ """ Returns the name of the attribute """
return self._name
@property
def default(self):
+ """ Returns the default value of the attribute """
return self._default
@property
def type(self):
+ """ Returns the type of the attribute """
return self._type
@property
def help(self):
+ """ Returns the help of the attribute """
return self._help
@property
def flags(self):
+ """ Returns the flags of the attribute """
return self._flags
@property
def allowed(self):
+ """ Returns the allowed value for this attribute """
return self._allowed
@property
def range(self):
+ """ Returns the range of the attribute """
return self._range
def has_flag(self, flag):
+ """ Returns true if the attribute has the flag 'flag'
+
+ :param flag: Flag that need to be ckecked
+ :type flag: Flags
+ """
return (self._flags & flag) == flag
def get_value(self):
+ """ Returns the value of the attribute """
return self._value
def set_value(self, value):
+ """ Change the value of the attribute after checking the type """
valid = True
if self.type == Types.Enumerate:
# TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
class ECState(object):
+ """ State of the Experiment Controller
+
+ """
RUNNING = 1
FAILED = 2
TERMINATED = 3
class ExperimentController(object):
+ """
+ .. class:: Class Args :
+
+ :param exp_id: Id of the experiment
+ :type exp_id: int
+ :param root_dir: Root directory of the experiment
+ :type root_dir: str
+
+ .. note::
+
+ This class is the only one used by the User. Indeed, the user "talks"
+ only with the Experiment Controller and this latter forward to
+ the different Resources Manager the order provided by the user.
+
+ """
+
def __init__(self, exp_id = None, root_dir = "/tmp"):
super(ExperimentController, self).__init__()
# root directory to store files
@property
def logger(self):
+ """ Return the logger of the Experiment Controller
+
+ """
return self._logger
@property
def ecstate(self):
+ """ Return the state of the Experiment Controller
+
+ """
return self._state
@property
def exp_id(self):
+ """ Return the experiment ID
+
+ """
exp_id = self._exp_id
if not exp_id.startswith("nepi-"):
exp_id = "nepi-" + exp_id
@property
def finished(self):
+ """ Put the state of the Experiment Controller into a final state :
+ Either TERMINATED or FAILED
+
+ """
return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
def wait_finished(self, guids):
- # Take into account if only one guids is given in parameter
+ """ Blocking method that wait until all the RM from the 'guid' list
+ reach the state FINISHED
+
+ :param guids: List of guids
+ :type guids: list
+ """
+ if isinstance(guids, int):
+ guids = [guids]
+
while not all([self.state(guid) in [ResourceState.FINISHED,
ResourceState.STOPPED,
ResourceState.FAILED] \
time.sleep(2)
def get_task(self, tid):
+ """ Get a specific task
+
+ :param tid: Id of the task
+ :type tid: int
+ :rtype: unknow
+ """
return self._tasks.get(tid)
def get_resource(self, guid):
+ """ Get a specific Resource Manager
+
+ :param guid: Id of the task
+ :type guid: int
+ :rtype: ResourceManager
+ """
return self._resources.get(guid)
@property
def resources(self):
+ """ Returns the list of all the Resource Manager Id
+
+ :rtype: set
+ """
return self._resources.keys()
def register_resource(self, rtype, guid = None):
+ """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
+ for the RM of type 'rtype' and add it to the list of Resources.
+
+ :param rtype: Type of the RM
+ :type rtype: str
+ :return : Id of the RM
+ :rtype: int
+ """
# Get next available guid
guid = self._guid_generator.next(guid)
return guid
def get_attributes(self, guid):
+ """ Return all the attibutes of a specific RM
+
+ :param guid: Guid of the RM
+ :type guid: int
+ :return : List of attributes
+ :rtype: list
+ """
rm = self.get_resource(guid)
return rm.get_attributes()
def register_connection(self, guid1, guid2):
+ """ Registers a guid1 with a guid2.
+ The declaration order is not important
+
+ :param guid1: First guid to connect
+ :type guid1: ResourceManager
+
+ :param guid2: Second guid to connect
+ :type guid: ResourceManager
+
+ """
rm1 = self.get_resource(guid1)
rm2 = self.get_resource(guid2)
return rm.trace(name, attr, block, offset)
def discover(self, guid):
+ """ Discover a specific RM defined by its 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.discover()
def provision(self, guid):
+ """ Provision a specific RM defined by its 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.provision()
def get(self, guid, name):
+ """ Get a specific attribute 'name' from the RM 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :param name: attribute's name
+ :type name: str
+
+ """
rm = self.get_resource(guid)
return rm.get(name)
def set(self, guid, name, value):
+ """ Set a specific attribute 'name' from the RM 'guid'
+ with the value 'value'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :param name: attribute's name
+ :type name: str
+
+ :param value: attribute's value
+
+ """
rm = self.get_resource(guid)
return rm.set(name, value)
return rm.state
def stop(self, guid):
+ """ Stop a specific RM defined by its 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.stop()
def start(self, guid):
+ """ Start a specific RM defined by its 'guid'
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.start()
rm.set_with_conditions(name, value, group2, state, time)
def stop_with_conditions(self, guid):
+ """ Stop a specific RM defined by its 'guid' only if all the conditions are true
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.stop_with_conditions()
def start_with_conditions(self, guid):
+ """ Start a specific RM defined by its 'guid' only if all the conditions are true
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ """
rm = self.get_resource(guid)
return rm.start_with_condition()
def release(self, group = None):
+ """ Release the elements of the list 'group' or
+ all the resources if any group is specified
+
+ :param group: List of RM
+ :type group: list
+
+ """
if not group:
group = self.resources
threads.remove(thread)
def shutdown(self):
+ """ Shutdown the Experiment Controller.
+ It means : Release all the resources and stop the scheduler
+
+ """
self.release()
self._stop_scheduler()
def schedule(self, date, callback, track = False):
""" Schedule a callback to be executed at time date.
- date string containing execution time for the task.
+ :param date: string containing execution time for the task.
It can be expressed as an absolute time, using
timestamp format, or as a relative time matching
^\d+.\d+(h|m|s|ms|us)$
- callback code to be executed for the task. Must be a
+ :param callback: code to be executed for the task. Must be a
Python function, and receives args and kwargs
as arguments.
- track if set to True, the task will be retrivable with
+ :param track: if set to True, the task will be retrivable with
the get_task() method
+
+ :return : The Id of the task
"""
timestamp = strfvalid(date)
return task.id
def _process(self):
+ """ Process at executing the task that are in the scheduler.
+
+ """
+
runner = ParallelRun(maxthreads = 50)
runner.start()
self._state = ECState.TERMINATED
def _execute(self, task):
+ """ Invoke the callback of the task 'task'
+
+ :param task: Id of the task
+ :type task: int
+
+ """
# Invoke callback
task.status = TaskStatus.DONE
raise
def _stop_scheduler(self):
+ """ Stop the scheduler and put the EC into a FAILED State.
+
+ """
+
# Mark the EC as failed
self._state = ECState.FAILED
reschedule_delay = "0.5s"
class ResourceAction:
+ """ Action that a user can order to a Resource Manager
+
+ """
DEPLOY = 0
START = 1
STOP = 2
class ResourceState:
+ """ State of a Resource Manager
+
+ """
NEW = 0
DISCOVERED = 1
PROVISIONED = 2
@classmethod
def rtype(cls):
+ """ Returns the type of the Resource Manager
+
+ """
return cls._rtype
@classmethod
@property
def guid(self):
+ """ Returns the guid of the current RM """
return self._guid
@property
def ec(self):
+ """ Returns the Experiment Controller """
return self._ec()
@property
def connections(self):
+ """ Returns the set of connection for this RM"""
return self._connections
@property
def conditions(self):
+ """ Returns the list of conditions for this RM
+ The list is a dictionary with for each action, a list of tuple
+ describing the conditions. """
return self._conditions
@property
@property
def state(self):
+ """ Get the state of the current RM """
return self._state
def log_message(self, msg):
+ """ Improve debugging message by adding more information
+ as the guid and the type of the RM
+
+ :param msg: Message to log
+ :type msg: str
+ :rtype: str
+ """
return " %s guid: %d - %s " % (self._rtype, self.guid, msg)
def connect(self, guid):
+ """ Connect the current RM with the RM 'guid'
+
+ :param guid: Guid of the RM the current RM will be connected
+ :type guid: int
+ """
if self.valid_connection(guid):
self._connections.add(guid)
def discover(self):
+ """ Discover the Resource. As it is specific for each RM,
+ this method take the time when the RM become DISCOVERED and
+ change the status """
self._discover_time = strfnow()
self._state = ResourceState.DISCOVERED
def provision(self):
+ """ Provision the Resource. As it is specific for each RM,
+ this method take the time when the RM become PROVISIONNED and
+ change the status """
self._provision_time = strfnow()
self._state = ResourceState.PROVISIONED
def start(self):
- """ Start the Resource Manager
+ """ Start the Resource Manager. As it is specific to each RM, this methods
+ just change, after some verifications, the status to STARTED and save the time.
"""
if not self._state in [ResourceState.READY, ResourceState.STOPPED]:
self._state = ResourceState.STARTED
def stop(self):
- """ Stop the Resource Manager
+ """ Stop the Resource Manager. As it is specific to each RM, this methods
+ just change, after some verifications, the status to STOPPED and save the time.
"""
if not self._state in [ResourceState.STARTED]:
conditions.append((group, state, time))
def get_connected(self, rtype):
+ """ Return the list of RM with the type 'rtype'
+
+ :param rtype: Type of the RM we look for
+ :type rtype: str
+ :return : list of guid
+ """
connected = []
for guid in self.connections:
rm = self.ec.get_resource(guid)
self._state = ResourceState.RELEASED
def valid_connection(self, guid):
- """Check if the connection is available.
+ """Check if the connection is available. This method need to be
+ redefined by each new Resource Manager.
:param guid: Guid of the current Resource Manager
:type guid: int
@classmethod
def resource_types(cls):
+ """Return the type of the Class"""
return cls._resource_types
@classmethod
def register_type(cls, rclass):
+ """Register a new Ressource Manager"""
cls._resource_types[rclass.rtype()] = rclass
@classmethod
def create(cls, rtype, ec, guid):
+ """Create a new instance of a Ressource Manager"""
rclass = cls._resource_types[rtype]
return rclass(ec, guid)
def populate_factory():
+ """Register all the possible RM that exists in the current version of Nepi.
+
+ """
for rclass in find_types():
ResourceFactory.register_type(rclass)
def find_types():
+ """Look into the different folders to find all the
+ availables Resources Managers
+
+ """
search_path = os.environ.get("NEPI_SEARCH_PATH", "")
search_path = set(search_path.split(" "))
class Task(object):
+ """ This class is to define a task, that is represented by an id,
+ an execution time 'timestamp' and an action 'callback """
+
def __init__(self, timestamp, callback):
self.id = None
self.timestamp = timestamp
self.status = TaskStatus.NEW
class HeapScheduler(object):
- """ This class is thread safe.
+ """ Create a Heap Scheduler.
+
+ .. note::
+
+ This class is thread safe.
All calls to C Extensions are made atomic by the GIL in the CPython implementation.
heapq.heappush, heapq.heappop, and list access are therefore thread-safe """
self._idgen = itertools.count(1)
def schedule(self, task):
+ """ Add the task 'task' in the heap of the scheduler
+
+ :param task: task that need to be schedule
+ :type task: task
+ """
if task.id == None:
task.id = self._idgen.next()
entry = (task.timestamp, task.id, task)
return task
def remove(self, tid):
+ """ Remove a task form the heap
+
+ :param tid: Id of the task that need to be removed
+ :type tid: int
+ """
try:
self._valid.remove(tid)
except:
pass
def next(self):
+ """ Get the next task in the scheduler
+
+ """
while self._queue:
try:
timestamp, tid, task = heapq.heappop(self._queue)
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
class TraceAttr:
+ """ Class representing the different attributes
+ that can characterized a trace.
+
+ """
ALL = 'all'
STREAM = 'stream'
PATH = 'path'
SIZE = 'size'
class Trace(object):
+ """
+ .. class:: Class Args :
+
+ :param name: Name of the trace
+ :type name: str
+ :param help: Help about the trace
+ :type help: str
+
+ """
+
def __init__(self, name, help):
self._name = name
self._help = help
@property
def name(self):
+ """ Returns the name of the trace """
return self._name
@property
def help(self):
+ """ Returns the help of the trace """
return self._help
.. class:: Class Args :
:param sliceid: Slice Name (= Xmpp Slice)
- :type expid: Str
+ :type expid: str
:param expid: Experiment ID (= Xmpp User)
- :type expid: Str
+ :type expid: str
.. note::
"""
:param sliceid: Slice Name (= Xmpp Slice)
- :type expid: Str
+ :type expid: str
:param expid: Experiment ID (= Xmpp User)
- :type expid: Str
+ :type expid: str
"""
self._slice_id = sliceid
.. class:: Class Args :
:param slice: Xmpp Slice
- :type slice: Str
+ :type slice: str
:param host: Xmpp Server
- :type host: Str
+ :type host: str
:param port: Xmpp Port
- :type port: Str
+ :type port: str
:param password: Xmpp password
- :type password: Str
+ :type password: str
:param xmpp_root: Root of the Xmpp Topic Architecture
- :type xmpp_root: Str
+ :type xmpp_root: str
.. note::
"""
:param slice: Xmpp Slice
- :type slice: Str
+ :type slice: str
:param host: Xmpp Server
- :type host: Str
+ :type host: str
:param port: Xmpp Port
- :type port: Str
+ :type port: str
:param password: Xmpp password
- :type password: Str
+ :type password: str
:param xmpp_root: Root of the Xmpp Topic Architecture
- :type xmpp_root: Str
+ :type xmpp_root: str
"""
super(OMFAPI, self).__init__("OMFAPI")
import traceback
import xml.etree.ElementTree as ET
-# inherit from BaseXmpp and XMLStream classes
+# inherit from BaseXmpp and XMLstream classes
class OMFClient(sleekxmpp.ClientXMPP, Logger):
"""
.. class:: Class Args :
:param jid: Jabber Id (= Xmpp Slice + Date)
- :type jid: Str
+ :type jid: str
:param password: Jabber Password (= Xmpp Password)
- :type password: Str
+ :type password: str
.. note::
"""
:param jid: Jabber Id (= Xmpp Slice + Date)
- :type jid: Str
+ :type jid: str
:param password: Jabber Password (= Xmpp Password)
- :type password: Str
+ :type password: str
"""