From 4e41e58c71c09ac4a38f6ecff12d3ae98a9e4154 Mon Sep 17 00:00:00 2001 From: Julien Tribino Date: Mon, 10 Jun 2013 19:00:18 +0200 Subject: [PATCH] Documentation of the execution folder --- src/nepi/execution/attribute.py | 38 ++++++ src/nepi/execution/ec.py | 173 ++++++++++++++++++++++++- src/nepi/execution/resource.py | 59 ++++++++- src/nepi/execution/scheduler.py | 22 +++- src/nepi/execution/trace.py | 16 +++ src/nepi/resources/omf/messages_5_4.py | 8 +- src/nepi/resources/omf/omf_api.py | 20 +-- src/nepi/resources/omf/omf_client.py | 10 +- 8 files changed, 319 insertions(+), 27 deletions(-) diff --git a/src/nepi/execution/attribute.py b/src/nepi/execution/attribute.py index 4dfeab34..3d46edc4 100644 --- a/src/nepi/execution/attribute.py +++ b/src/nepi/execution/attribute.py @@ -27,6 +27,9 @@ class Types: ### Attribute Flags class Flags: + """ Differents flags to characterize an attribute + + """ # Attribute can be modified by the user NoFlags = 0x00 # Attribute is not modifiable by the user @@ -39,6 +42,27 @@ class Flags: Filter = 0x08 class Attribute(object): + """ + .. class:: Class Args : + + :param name: Name of the attribute + :type name: str + :param help: Help about the attribute + :type help: str + :param type: type of the attribute + :type type: str + :param flags: Help about the attribute + :type flags: str + :param default: Default value of the attribute + :type default: str + :param allowed: Allowed value for this attribute + :type allowed: str + :param range: Range of the attribute + :type range: str + :param set_hook: hook that is related with this attribute + :type set_hook: str + + """ def __init__(self, name, help, type = Types.String, flags = Flags.NoFlags, default = None, allowed = None, range = None, set_hook = None): @@ -55,39 +79,53 @@ class Attribute(object): @property def name(self): + """ Returns the name of the attribute """ return self._name @property def default(self): + """ Returns the default value of the attribute """ return self._default @property def type(self): + """ Returns the type of the attribute """ return self._type @property def help(self): + """ Returns the help of the attribute """ return self._help @property def flags(self): + """ Returns the flags of the attribute """ return self._flags @property def allowed(self): + """ Returns the allowed value for this attribute """ return self._allowed @property def range(self): + """ Returns the range of the attribute """ return self._range def has_flag(self, flag): + """ Returns true if the attribute has the flag 'flag' + + :param flag: Flag that need to be ckecked + :type flag: Flags + """ return (self._flags & flag) == flag def get_value(self): + """ Returns the value of the attribute """ return self._value def set_value(self, value): + """ Change the value of the attribute after checking the type """ valid = True if self.type == Types.Enumerate: diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index f2a19251..34c5c50b 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -37,11 +37,30 @@ from nepi.execution.trace import TraceAttr # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!! class ECState(object): + """ State of the Experiment Controller + + """ RUNNING = 1 FAILED = 2 TERMINATED = 3 class ExperimentController(object): + """ + .. class:: Class Args : + + :param exp_id: Id of the experiment + :type exp_id: int + :param root_dir: Root directory of the experiment + :type root_dir: str + + .. note:: + + This class is the only one used by the User. Indeed, the user "talks" + only with the Experiment Controller and this latter forward to + the different Resources Manager the order provided by the user. + + """ + def __init__(self, exp_id = None, root_dir = "/tmp"): super(ExperimentController, self).__init__() # root directory to store files @@ -76,14 +95,23 @@ class ExperimentController(object): @property def logger(self): + """ Return the logger of the Experiment Controller + + """ return self._logger @property def ecstate(self): + """ Return the state of the Experiment Controller + + """ return self._state @property def exp_id(self): + """ Return the experiment ID + + """ exp_id = self._exp_id if not exp_id.startswith("nepi-"): exp_id = "nepi-" + exp_id @@ -91,10 +119,22 @@ class ExperimentController(object): @property def finished(self): + """ Put the state of the Experiment Controller into a final state : + Either TERMINATED or FAILED + + """ return self.ecstate in [ECState.FAILED, ECState.TERMINATED] def wait_finished(self, guids): - # Take into account if only one guids is given in parameter + """ Blocking method that wait until all the RM from the 'guid' list + reach the state FINISHED + + :param guids: List of guids + :type guids: list + """ + if isinstance(guids, int): + guids = [guids] + while not all([self.state(guid) in [ResourceState.FINISHED, ResourceState.STOPPED, ResourceState.FAILED] \ @@ -104,16 +144,40 @@ class ExperimentController(object): time.sleep(2) def get_task(self, tid): + """ Get a specific task + + :param tid: Id of the task + :type tid: int + :rtype: unknow + """ return self._tasks.get(tid) def get_resource(self, guid): + """ Get a specific Resource Manager + + :param guid: Id of the task + :type guid: int + :rtype: ResourceManager + """ return self._resources.get(guid) @property def resources(self): + """ Returns the list of all the Resource Manager Id + + :rtype: set + """ return self._resources.keys() def register_resource(self, rtype, guid = None): + """ Register a Resource Manager. It creates a new 'guid', if it is not specified, + for the RM of type 'rtype' and add it to the list of Resources. + + :param rtype: Type of the RM + :type rtype: str + :return : Id of the RM + :rtype: int + """ # Get next available guid guid = self._guid_generator.next(guid) @@ -126,10 +190,27 @@ class ExperimentController(object): return guid def get_attributes(self, guid): + """ Return all the attibutes of a specific RM + + :param guid: Guid of the RM + :type guid: int + :return : List of attributes + :rtype: list + """ rm = self.get_resource(guid) return rm.get_attributes() def register_connection(self, guid1, guid2): + """ Registers a guid1 with a guid2. + The declaration order is not important + + :param guid1: First guid to connect + :type guid1: ResourceManager + + :param guid2: Second guid to connect + :type guid: ResourceManager + + """ rm1 = self.get_resource(guid1) rm2 = self.get_resource(guid2) @@ -200,18 +281,51 @@ class ExperimentController(object): return rm.trace(name, attr, block, offset) def discover(self, guid): + """ Discover a specific RM defined by its 'guid' + + :param guid: Guid of the RM + :type guid: int + + """ rm = self.get_resource(guid) return rm.discover() def provision(self, guid): + """ Provision a specific RM defined by its 'guid' + + :param guid: Guid of the RM + :type guid: int + + """ rm = self.get_resource(guid) return rm.provision() def get(self, guid, name): + """ Get a specific attribute 'name' from the RM 'guid' + + :param guid: Guid of the RM + :type guid: int + + :param name: attribute's name + :type name: str + + """ rm = self.get_resource(guid) return rm.get(name) def set(self, guid, name, value): + """ Set a specific attribute 'name' from the RM 'guid' + with the value 'value' + + :param guid: Guid of the RM + :type guid: int + + :param name: attribute's name + :type name: str + + :param value: attribute's value + + """ rm = self.get_resource(guid) return rm.set(name, value) @@ -233,10 +347,22 @@ class ExperimentController(object): return rm.state def stop(self, guid): + """ Stop a specific RM defined by its 'guid' + + :param guid: Guid of the RM + :type guid: int + + """ rm = self.get_resource(guid) return rm.stop() def start(self, guid): + """ Start a specific RM defined by its 'guid' + + :param guid: Guid of the RM + :type guid: int + + """ rm = self.get_resource(guid) return rm.start() @@ -278,10 +404,22 @@ class ExperimentController(object): rm.set_with_conditions(name, value, group2, state, time) def stop_with_conditions(self, guid): + """ Stop a specific RM defined by its 'guid' only if all the conditions are true + + :param guid: Guid of the RM + :type guid: int + + """ rm = self.get_resource(guid) return rm.stop_with_conditions() def start_with_conditions(self, guid): + """ Start a specific RM defined by its 'guid' only if all the conditions are true + + :param guid: Guid of the RM + :type guid: int + + """ rm = self.get_resource(guid) return rm.start_with_condition() @@ -353,6 +491,13 @@ class ExperimentController(object): def release(self, group = None): + """ Release the elements of the list 'group' or + all the resources if any group is specified + + :param group: List of RM + :type group: list + + """ if not group: group = self.resources @@ -372,6 +517,10 @@ class ExperimentController(object): threads.remove(thread) def shutdown(self): + """ Shutdown the Experiment Controller. + It means : Release all the resources and stop the scheduler + + """ self.release() self._stop_scheduler() @@ -382,17 +531,19 @@ class ExperimentController(object): def schedule(self, date, callback, track = False): """ Schedule a callback to be executed at time date. - date string containing execution time for the task. + :param date: string containing execution time for the task. It can be expressed as an absolute time, using timestamp format, or as a relative time matching ^\d+.\d+(h|m|s|ms|us)$ - callback code to be executed for the task. Must be a + :param callback: code to be executed for the task. Must be a Python function, and receives args and kwargs as arguments. - track if set to True, the task will be retrivable with + :param track: if set to True, the task will be retrivable with the get_task() method + + :return : The Id of the task """ timestamp = strfvalid(date) @@ -410,6 +561,10 @@ class ExperimentController(object): return task.id def _process(self): + """ Process at executing the task that are in the scheduler. + + """ + runner = ParallelRun(maxthreads = 50) runner.start() @@ -455,6 +610,12 @@ class ExperimentController(object): self._state = ECState.TERMINATED def _execute(self, task): + """ Invoke the callback of the task 'task' + + :param task: Id of the task + :type task: int + + """ # Invoke callback task.status = TaskStatus.DONE @@ -474,6 +635,10 @@ class ExperimentController(object): raise def _stop_scheduler(self): + """ Stop the scheduler and put the EC into a FAILED State. + + """ + # Mark the EC as failed self._state = ECState.FAILED diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index 0263ee44..423ca926 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -31,11 +31,17 @@ import weakref reschedule_delay = "0.5s" class ResourceAction: + """ Action that a user can order to a Resource Manager + + """ DEPLOY = 0 START = 1 STOP = 2 class ResourceState: + """ State of a Resource Manager + + """ NEW = 0 DISCOVERED = 1 PROVISIONED = 2 @@ -161,6 +167,9 @@ class ResourceManager(Logger): @classmethod def rtype(cls): + """ Returns the type of the Resource Manager + + """ return cls._rtype @classmethod @@ -202,18 +211,24 @@ class ResourceManager(Logger): @property def guid(self): + """ Returns the guid of the current RM """ return self._guid @property def ec(self): + """ Returns the Experiment Controller """ return self._ec() @property def connections(self): + """ Returns the set of connection for this RM""" return self._connections @property def conditions(self): + """ Returns the list of conditions for this RM + The list is a dictionary with for each action, a list of tuple + describing the conditions. """ return self._conditions @property @@ -248,25 +263,45 @@ class ResourceManager(Logger): @property def state(self): + """ Get the state of the current RM """ return self._state def log_message(self, msg): + """ Improve debugging message by adding more information + as the guid and the type of the RM + + :param msg: Message to log + :type msg: str + :rtype: str + """ return " %s guid: %d - %s " % (self._rtype, self.guid, msg) def connect(self, guid): + """ Connect the current RM with the RM 'guid' + + :param guid: Guid of the RM the current RM will be connected + :type guid: int + """ if self.valid_connection(guid): self._connections.add(guid) def discover(self): + """ Discover the Resource. As it is specific for each RM, + this method take the time when the RM become DISCOVERED and + change the status """ self._discover_time = strfnow() self._state = ResourceState.DISCOVERED def provision(self): + """ Provision the Resource. As it is specific for each RM, + this method take the time when the RM become PROVISIONNED and + change the status """ self._provision_time = strfnow() self._state = ResourceState.PROVISIONED def start(self): - """ Start the Resource Manager + """ Start the Resource Manager. As it is specific to each RM, this methods + just change, after some verifications, the status to STARTED and save the time. """ if not self._state in [ResourceState.READY, ResourceState.STOPPED]: @@ -277,7 +312,8 @@ class ResourceManager(Logger): self._state = ResourceState.STARTED def stop(self): - """ Stop the Resource Manager + """ Stop the Resource Manager. As it is specific to each RM, this methods + just change, after some verifications, the status to STOPPED and save the time. """ if not self._state in [ResourceState.STARTED]: @@ -369,6 +405,12 @@ class ResourceManager(Logger): conditions.append((group, state, time)) def get_connected(self, rtype): + """ Return the list of RM with the type 'rtype' + + :param rtype: Type of the RM we look for + :type rtype: str + :return : list of guid + """ connected = [] for guid in self.connections: rm = self.ec.get_resource(guid) @@ -554,7 +596,8 @@ class ResourceManager(Logger): self._state = ResourceState.RELEASED def valid_connection(self, guid): - """Check if the connection is available. + """Check if the connection is available. This method need to be + redefined by each new Resource Manager. :param guid: Guid of the current Resource Manager :type guid: int @@ -569,22 +612,32 @@ class ResourceFactory(object): @classmethod def resource_types(cls): + """Return the type of the Class""" return cls._resource_types @classmethod def register_type(cls, rclass): + """Register a new Ressource Manager""" cls._resource_types[rclass.rtype()] = rclass @classmethod def create(cls, rtype, ec, guid): + """Create a new instance of a Ressource Manager""" rclass = cls._resource_types[rtype] return rclass(ec, guid) def populate_factory(): + """Register all the possible RM that exists in the current version of Nepi. + + """ for rclass in find_types(): ResourceFactory.register_type(rclass) def find_types(): + """Look into the different folders to find all the + availables Resources Managers + + """ search_path = os.environ.get("NEPI_SEARCH_PATH", "") search_path = set(search_path.split(" ")) diff --git a/src/nepi/execution/scheduler.py b/src/nepi/execution/scheduler.py index 8286d4fe..1b353627 100644 --- a/src/nepi/execution/scheduler.py +++ b/src/nepi/execution/scheduler.py @@ -27,6 +27,9 @@ class TaskStatus: class Task(object): + """ This class is to define a task, that is represented by an id, + an execution time 'timestamp' and an action 'callback """ + def __init__(self, timestamp, callback): self.id = None self.timestamp = timestamp @@ -35,7 +38,11 @@ class Task(object): self.status = TaskStatus.NEW class HeapScheduler(object): - """ This class is thread safe. + """ Create a Heap Scheduler. + + .. note:: + + This class is thread safe. All calls to C Extensions are made atomic by the GIL in the CPython implementation. heapq.heappush, heapq.heappop, and list access are therefore thread-safe """ @@ -46,6 +53,11 @@ class HeapScheduler(object): self._idgen = itertools.count(1) def schedule(self, task): + """ Add the task 'task' in the heap of the scheduler + + :param task: task that need to be schedule + :type task: task + """ if task.id == None: task.id = self._idgen.next() entry = (task.timestamp, task.id, task) @@ -54,12 +66,20 @@ class HeapScheduler(object): return task def remove(self, tid): + """ Remove a task form the heap + + :param tid: Id of the task that need to be removed + :type tid: int + """ try: self._valid.remove(tid) except: pass def next(self): + """ Get the next task in the scheduler + + """ while self._queue: try: timestamp, tid, task = heapq.heappop(self._queue) diff --git a/src/nepi/execution/trace.py b/src/nepi/execution/trace.py index 59d2394e..1d16242e 100644 --- a/src/nepi/execution/trace.py +++ b/src/nepi/execution/trace.py @@ -18,12 +18,26 @@ # Author: Alina Quereilhac class TraceAttr: + """ Class representing the different attributes + that can characterized a trace. + + """ ALL = 'all' STREAM = 'stream' PATH = 'path' SIZE = 'size' class Trace(object): + """ + .. class:: Class Args : + + :param name: Name of the trace + :type name: str + :param help: Help about the trace + :type help: str + + """ + def __init__(self, name, help): self._name = name self._help = help @@ -31,9 +45,11 @@ class Trace(object): @property def name(self): + """ Returns the name of the trace """ return self._name @property def help(self): + """ Returns the help of the trace """ return self._help diff --git a/src/nepi/resources/omf/messages_5_4.py b/src/nepi/resources/omf/messages_5_4.py index f3cde244..b27531ae 100644 --- a/src/nepi/resources/omf/messages_5_4.py +++ b/src/nepi/resources/omf/messages_5_4.py @@ -25,9 +25,9 @@ class MessageHandler(): .. class:: Class Args : :param sliceid: Slice Name (= Xmpp Slice) - :type expid: Str + :type expid: str :param expid: Experiment ID (= Xmpp User) - :type expid: Str + :type expid: str .. note:: @@ -39,9 +39,9 @@ class MessageHandler(): """ :param sliceid: Slice Name (= Xmpp Slice) - :type expid: Str + :type expid: str :param expid: Experiment ID (= Xmpp User) - :type expid: Str + :type expid: str """ self._slice_id = sliceid diff --git a/src/nepi/resources/omf/omf_api.py b/src/nepi/resources/omf/omf_api.py index 4dfcc6fc..9654b68d 100644 --- a/src/nepi/resources/omf/omf_api.py +++ b/src/nepi/resources/omf/omf_api.py @@ -35,15 +35,15 @@ class OMFAPI(Logger): .. class:: Class Args : :param slice: Xmpp Slice - :type slice: Str + :type slice: str :param host: Xmpp Server - :type host: Str + :type host: str :param port: Xmpp Port - :type port: Str + :type port: str :param password: Xmpp password - :type password: Str + :type password: str :param xmpp_root: Root of the Xmpp Topic Architecture - :type xmpp_root: Str + :type xmpp_root: str .. note:: @@ -54,15 +54,15 @@ class OMFAPI(Logger): """ :param slice: Xmpp Slice - :type slice: Str + :type slice: str :param host: Xmpp Server - :type host: Str + :type host: str :param port: Xmpp Port - :type port: Str + :type port: str :param password: Xmpp password - :type password: Str + :type password: str :param xmpp_root: Root of the Xmpp Topic Architecture - :type xmpp_root: Str + :type xmpp_root: str """ super(OMFAPI, self).__init__("OMFAPI") diff --git a/src/nepi/resources/omf/omf_client.py b/src/nepi/resources/omf/omf_client.py index 518b1d84..a9a8cd99 100644 --- a/src/nepi/resources/omf/omf_client.py +++ b/src/nepi/resources/omf/omf_client.py @@ -25,15 +25,15 @@ from sleekxmpp.exceptions import IqError, IqTimeout import traceback import xml.etree.ElementTree as ET -# inherit from BaseXmpp and XMLStream classes +# inherit from BaseXmpp and XMLstream classes class OMFClient(sleekxmpp.ClientXMPP, Logger): """ .. class:: Class Args : :param jid: Jabber Id (= Xmpp Slice + Date) - :type jid: Str + :type jid: str :param password: Jabber Password (= Xmpp Password) - :type password: Str + :type password: str .. note:: @@ -45,9 +45,9 @@ class OMFClient(sleekxmpp.ClientXMPP, Logger): """ :param jid: Jabber Id (= Xmpp Slice + Date) - :type jid: Str + :type jid: str :param password: Jabber Password (= Xmpp Password) - :type password: Str + :type password: str """ -- 2.43.0