X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fresource.py;h=2d738c5d07d270ad85a847f9f01b3cce5942c20d;hb=bf43c83ced9389c8fa9468d7c23f67d35af963da;hp=a7e7758f06ec0761111c07bf79df4b1be7525898;hpb=e41672c333f70c7beb4a9c9c208d77f213092aee;p=nepi.git diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index a7e7758f..2d738c5d 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -1,22 +1,47 @@ -from neco.util.timefuncs import strfnow, strfdiff, strfvalid -from neco.execution.trace import TraceAttr +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat +from nepi.util.logger import Logger +from nepi.execution.trace import TraceAttr import copy import functools -import inspect import logging import os import pkgutil import weakref -reschedule_delay = "0.5s" +reschedule_delay = "1s" class ResourceAction: + """ Action that a user can order to a Resource Manager + + """ DEPLOY = 0 START = 1 STOP = 2 class ResourceState: + """ State of a Resource Manager + + """ NEW = 0 DISCOVERED = 1 PROVISIONED = 2 @@ -27,33 +52,55 @@ class ResourceState: FAILED = 7 RELEASED = 8 +ResourceState2str = dict({ + ResourceState.NEW : "NEW", + ResourceState.DISCOVERED : "DISCOVERED", + ResourceState.PROVISIONED : "PROVISIONED", + ResourceState.READY : "READY", + ResourceState.STARTED : "STARTED", + ResourceState.STOPPED : "STOPPED", + ResourceState.FINISHED : "FINISHED", + ResourceState.FAILED : "FAILED", + ResourceState.RELEASED : "RELEASED", + }) + def clsinit(cls): + """ Initializes template information (i.e. attributes and traces) + for the ResourceManager class + """ cls._clsinit() return cls +def clsinit_copy(cls): + """ Initializes template information (i.e. attributes and traces) + for the ResourceManager class, inheriting attributes and traces + from the parent class + """ + cls._clsinit_copy() + return cls + # Decorator to invoke class initialization method @clsinit -class ResourceManager(object): +class ResourceManager(Logger): _rtype = "Resource" - _filters = None _attributes = None _traces = None @classmethod - def _register_filter(cls, attr): + def _register_attribute(cls, attr): """ Resource subclasses will invoke this method to add a - filter attribute + resource attribute """ - cls._filters[attr.name] = attr + cls._attributes[attr.name] = attr @classmethod - def _register_attribute(cls, attr): - """ Resource subclasses will invoke this method to add a + def _remove_attribute(cls, name): + """ Resource subclasses will invoke this method to remove a resource attribute """ - cls._attributes[attr.name] = attr + del cls._attributes[name] @classmethod def _register_trace(cls, trace): @@ -63,14 +110,13 @@ class ResourceManager(object): """ cls._traces[trace.name] = trace - @classmethod - def _register_filters(cls): - """ Resource subclasses will invoke this method to register - resource filters + def _remove_trace(cls, name): + """ Resource subclasses will invoke this method to remove a + resource trace """ - pass + del cls._traces[name] @classmethod def _register_attributes(cls): @@ -90,16 +136,14 @@ class ResourceManager(object): @classmethod def _clsinit(cls): - """ Create a new dictionnary instance of the dictionnary - with the same template. - - Each ressource should have the same registration dictionary - template with different instances. + """ ResourceManager child classes have different attributes and traces. + Since the templates that hold the information of attributes and traces + are 'class attribute' dictionaries, initially they all point to the + parent class ResourceManager instances of those dictionaries. + In order to make these templates independent from the parent's one, + it is necessary re-initialize the corresponding dictionaries. + This is the objective of the _clsinit method """ - # static template for resource filters - cls._filters = dict() - cls._register_filters() - # static template for resource attributes cls._attributes = dict() cls._register_attributes() @@ -109,15 +153,24 @@ class ResourceManager(object): cls._register_traces() @classmethod - def rtype(cls): - return cls._rtype + def _clsinit_copy(cls): + """ Same as _clsinit, except that it also inherits all attributes and traces + from the parent class. + """ + # static template for resource attributes + cls._attributes = copy.deepcopy(cls._attributes) + cls._register_attributes() + + # static template for resource traces + cls._traces = copy.deepcopy(cls._traces) + cls._register_traces() @classmethod - def get_filters(cls): - """ Returns a copy of the filters + def rtype(cls): + """ Returns the type of the Resource Manager """ - return copy.deepcopy(cls._filters.values()) + return cls._rtype @classmethod def get_attributes(cls): @@ -134,6 +187,8 @@ class ResourceManager(object): return copy.deepcopy(cls._traces.values()) def __init__(self, ec, guid): + super(ResourceManager, self).__init__(self.rtype()) + self._guid = guid self._ec = weakref.ref(ec) self._connections = set() @@ -145,7 +200,9 @@ class ResourceManager(object): # the resource instance gets a copy of all traces self._trcs = copy.deepcopy(self._traces) - self._state = ResourceState.NEW + # Each resource is placed on a deployment group by the EC + # during deployment + self.deployment_group = None self._start_time = None self._stop_time = None @@ -153,123 +210,170 @@ class ResourceManager(object): self._provision_time = None self._ready_time = None self._release_time = None + self._finish_time = None + self._failed_time = None - # Logging - self._logger = logging.getLogger("Resource") - - def debug(self, msg, out = None, err = None): - self.log(msg, logging.DEBUG, out, err) - - def error(self, msg, out = None, err = None): - self.log(msg, logging.ERROR, out, err) - - def warn(self, msg, out = None, err = None): - self.log(msg, logging.WARNING, out, err) - - def info(self, msg, out = None, err = None): - self.log(msg, logging.INFO, out, err) - - def log(self, msg, level, out = None, err = None): - if out: - msg += " - OUT: %s " % out - - if err: - msg += " - ERROR: %s " % err - - msg = self.log_message(msg) - - self.logger.log(level, msg) - - def log_message(self, msg): - return " %s guid: %d - %s " % (self._rtype, self.guid, msg) - - @property - def logger(self): - return self._logger + self._state = ResourceState.NEW @property def guid(self): + """ Returns the global unique identifier of the RM """ return self._guid @property def ec(self): + """ Returns the Experiment Controller """ return self._ec() @property def connections(self): + """ Returns the set of guids of connected RMs""" return self._connections @property def conditions(self): + """ Returns the conditions to which the RM is subjected to. + + The object returned by this method is a dictionary indexed by + ResourceAction.""" return self._conditions @property def start_time(self): - """ Returns timestamp with the time the RM started """ + """ Returns the start time of the RM as a timestamp""" return self._start_time @property def stop_time(self): - """ Returns timestamp with the time the RM stopped """ + """ Returns the stop time of the RM as a timestamp""" return self._stop_time @property def discover_time(self): - """ Returns timestamp with the time the RM passed to state discovered """ + """ Returns the time discovering was finished for the RM as a timestamp""" return self._discover_time @property def provision_time(self): - """ Returns timestamp with the time the RM passed to state provisioned """ + """ Returns the time provisioning was finished for the RM as a timestamp""" return self._provision_time @property def ready_time(self): - """ Returns timestamp with the time the RM passed to state ready """ + """ Returns the time deployment was finished for the RM as a timestamp""" return self._ready_time @property def release_time(self): - """ Returns timestamp with the time the RM was released """ + """ Returns the release time of the RM as a timestamp""" return self._release_time + @property + def finish_time(self): + """ Returns the finalization time of the RM as a timestamp""" + return self._finish_time + + @property + def failed_time(self): + """ Returns the time failure occured for the RM as a timestamp""" + return self._failed_time + @property def state(self): + """ Get the state of the current RM """ return self._state - def connect(self, guid): + def log_message(self, msg): + """ Returns the log message formatted with added information. + + :param msg: text message + :type msg: str + :rtype: str + """ + return " %s guid: %d - %s " % (self._rtype, self.guid, msg) + + def register_connection(self, guid): + """ Registers a connection to the RM identified by guid + + :param guid: Global unique identified of the RM to connect to + :type guid: int + """ if self.valid_connection(guid): + self.connect(guid) self._connections.add(guid) - def discover(self, filters = None): - self._discover_time = strfnow() - self._state = ResourceState.DISCOVERED + def unregister_connection(self, guid): + """ Removes a registered connection to the RM identified by guid - def provision(self, filters = None): - self._provision_time = strfnow() - self._state = ResourceState.PROVISIONED + :param guid: Global unique identified of the RM to connect to + :type guid: int + """ + if guid in self._connections: + self.disconnect(guid) + self._connections.remove(guid) - def start(self): - """ Start the Resource Manager + def discover(self): + """ Performs resource discovery. + + This method is resposible for selecting an individual resource + matching user requirements. + This method should be redefined when necessary in child classes. + """ + self.set_discovered() + def provision(self): + """ Performs resource provisioning. + + This method is resposible for provisioning one resource. + After this method has been successfully invoked, the resource + should be acccesible/controllable by the RM. + This method should be redefined when necessary in child classes. + """ + self.set_provisioned() + + def start(self): + """ Starts the resource. + + There is no generic start behavior for all resources. + This method should be redefined when necessary in child classes. """ - if not self._state in [ResourceState.READY, ResourceState.STOPPED]: + if not self.state in [ResourceState.READY, ResourceState.STOPPED]: self.error("Wrong state %s for start" % self.state) return - self._start_time = strfnow() - self._state = ResourceState.STARTED + self.set_started() def stop(self): - """ Start the Resource Manager - + """ Stops the resource. + + There is no generic stop behavior for all resources. + This method should be redefined when necessary in child classes. """ - if not self._state in [ResourceState.STARTED]: + if not self.state in [ResourceState.STARTED]: self.error("Wrong state %s for stop" % self.state) return + + self.set_stopped() - self._stop_time = strfnow() - self._state = ResourceState.STOPPED + def deploy(self): + """ Execute all steps required for the RM to reach the state READY + + """ + if self.state > ResourceState.READY: + self.error("Wrong state %s for deploy" % self.state) + return + + self.debug("----- READY ---- ") + self.set_ready() + + def release(self): + self.set_released() + + def finish(self): + self.set_finished() + + def fail(self): + self.set_failed() def set(self, name, value): """ Set the value of the attribute @@ -283,7 +387,7 @@ class ResourceManager(object): attr.value = value def get(self, name): - """ Start the Resource Manager + """ Returns the value of the attribute :param name: Name of the attribute :type name: str @@ -292,15 +396,24 @@ class ResourceManager(object): attr = self._attrs[name] return attr.value - def register_trace(self, name): - """ Enable trace + def enable_trace(self, name): + """ Explicitly enable trace generation :param name: Name of the trace :type name: str """ trace = self._trcs[name] trace.enabled = True + + def trace_enabled(self, name): + """Returns True if trace is enables + :param name: Name of the trace + :type name: str + """ + trace = self._trcs[name] + return trace.enabled + def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): """ Get information on collected trace @@ -324,8 +437,7 @@ class ResourceManager(object): """ pass - def register_condition(self, action, group, state, - time = None): + def register_condition(self, action, group, state, time = None): """ Registers a condition on the resource manager to allow execution of 'action' only after 'time' has elapsed from the moment all resources in 'group' reached state 'state' @@ -340,10 +452,11 @@ class ResourceManager(object): :type time: str """ + + if not action in self.conditions: + self._conditions[action] = list() + conditions = self.conditions.get(action) - if not conditions: - conditions = list() - self._conditions[action] = conditions # For each condition to register a tuple of (group, state, time) is # added to the 'action' list @@ -352,11 +465,49 @@ class ResourceManager(object): conditions.append((group, state, time)) - def get_connected(self, rtype): + def unregister_condition(self, group, action = None): + """ Removed conditions for a certain group of guids + + :param action: Action to restrict to condition (either 'START' or 'STOP') + :type action: str + + :param group: Group of RMs to wait for (list of guids) + :type group: int or list of int + + """ + # For each condition a tuple of (group, state, time) is + # added to the 'action' list + if not isinstance(group, list): + group = [group] + + for act, conditions in self.conditions.iteritems(): + if action and act != action: + continue + + for condition in list(conditions): + (grp, state, time) = condition + + # If there is an intersection between grp and group, + # then remove intersected elements + intsec = set(group).intersection(set(grp)) + if intsec: + idx = conditions.index(condition) + newgrp = set(grp) + newgrp.difference_update(intsec) + conditions[idx] = (newgrp, state, time) + + def get_connected(self, rtype = None): + """ Returns the list of RM with the type 'rtype' + + :param rtype: Type of the RM we look for + :type rtype: str + :return: list of guid + """ connected = [] + rclass = ResourceFactory.get_resource_type(rtype) for guid in self.connections: rm = self.ec.get_resource(guid) - if rm.rtype() == rtype: + if not rtype or isinstance(rm, rclass): connected.append(rm) return connected @@ -383,7 +534,7 @@ class ResourceManager(object): for guid in group: rm = self.ec.get_resource(guid) # If the RM state is lower than the requested state we must - # reschedule (e.g. if RM is READY but we required STARTED) + # reschedule (e.g. if RM is READY but we required STARTED). if rm.state < state: reschedule = True break @@ -405,18 +556,23 @@ class ResourceManager(object): # Only keep time information for START and STOP break - d = strfdiff(strfnow(), t) - wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s")) + # time already elapsed since RM changed state + waited = "%fs" % tdiffsec(tnow(), t) + + # time still to wait + wait = tdiffsec(stabsformat(time), stabsformat(waited)) + if wait > 0.001: reschedule = True delay = "%fs" % wait break + return reschedule, delay def set_with_conditions(self, name, value, group, state, time): """ Set value 'value' on attribute with name 'name' when 'time' - has elapsed since all elements in 'group' have reached state - 'state' + has elapsed since all elements in 'group' have reached state + 'state' :param name: Name of the attribute to set :type name: str @@ -428,7 +584,6 @@ class ResourceManager(object): :type state: str :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s') :type time: str - """ reschedule = False @@ -515,30 +670,25 @@ class ResourceManager(object): callback = functools.partial(self.stop_with_conditions) self.ec.schedule(delay, callback) else: - self.logger.debug(" ----- STOPPING ---- ") + self.debug(" ----- STOPPING ---- ") self.stop() - def deploy(self): - """ Execute all steps required for the RM to reach the state READY - + def connect(self, guid): + """ Performs actions that need to be taken upon associating RMs. + This method should be redefined when necessary in child classes. """ - if self._state > ResourceState.READY: - self.error("Wrong state %s for deploy" % self.state) - return - - self.debug("----- DEPLOYING ---- ") - self._ready_time = strfnow() - self._state = ResourceState.READY - - def release(self): - """Clean the resource at the end of the Experiment and change the status + pass + def disconnect(self, guid): + """ Performs actions that need to be taken upon disassociating RMs. + This method should be redefined when necessary in child classes. """ - self._release_time = strfnow() - self._state = ResourceState.RELEASED + pass def valid_connection(self, guid): - """Check if the connection is available. + """Checks whether a connection with the other RM + is valid. + This method need to be redefined by each new Resource Manager. :param guid: Guid of the current Resource Manager :type guid: int @@ -547,41 +697,102 @@ class ResourceManager(object): """ # TODO: Validate! return True + + def set_started(self): + """ Mark ResourceManager as STARTED """ + self._start_time = tnow() + self._state = ResourceState.STARTED + + def set_stopped(self): + """ Mark ResourceManager as STOPPED """ + self._stop_time = tnow() + self._state = ResourceState.STOPPED + + def set_ready(self): + """ Mark ResourceManager as READY """ + self._ready_time = tnow() + self._state = ResourceState.READY + + def set_released(self): + """ Mark ResourceManager as REALEASED """ + self._release_time = tnow() + self._state = ResourceState.RELEASED + + def set_finished(self): + """ Mark ResourceManager as FINISHED """ + self._finish_time = tnow() + self._state = ResourceState.FINISHED + + def set_failed(self): + """ Mark ResourceManager as FAILED """ + self._failed_time = tnow() + self._state = ResourceState.FAILED + + def set_discovered(self): + """ Mark ResourceManager as DISCOVERED """ + self._discover_time = tnow() + self._state = ResourceState.DISCOVERED + + def set_provisioned(self): + """ Mark ResourceManager as PROVISIONED """ + self._provision_time = tnow() + self._state = ResourceState.PROVISIONED class ResourceFactory(object): _resource_types = dict() @classmethod def resource_types(cls): + """Return the type of the Class""" return cls._resource_types + @classmethod + def get_resource_type(cls, rtype): + """Return the type of the Class""" + return cls._resource_types.get(rtype) + @classmethod def register_type(cls, rclass): + """Register a new Ressource Manager""" cls._resource_types[rclass.rtype()] = rclass @classmethod def create(cls, rtype, ec, guid): + """Create a new instance of a Ressource Manager""" rclass = cls._resource_types[rtype] return rclass(ec, guid) def populate_factory(): - for rclass in find_types(): - ResourceFactory.register_type(rclass) + """Register all the possible RM that exists in the current version of Nepi. + """ + # Once the factory is populated, don't repopulate + if not ResourceFactory.resource_types(): + for rclass in find_types(): + ResourceFactory.register_type(rclass) def find_types(): + """Look into the different folders to find all the + availables Resources Managers + """ search_path = os.environ.get("NEPI_SEARCH_PATH", "") search_path = set(search_path.split(" ")) - import neco.resources - path = os.path.dirname(neco.resources.__file__) + import inspect + import nepi.resources + path = os.path.dirname(nepi.resources.__file__) search_path.add(path) types = [] - for importer, modname, ispkg in pkgutil.walk_packages(search_path): + for importer, modname, ispkg in pkgutil.walk_packages(search_path, + prefix = "nepi.resources."): + loader = importer.find_module(modname) + try: - module = loader.load_module(loader.fullname) + # Notice: Repeated calls to load_module will act as a reload of teh module + module = loader.load_module(modname) + for attrname in dir(module): if attrname.startswith("_"): continue @@ -598,9 +809,10 @@ def find_types(): types.append(attr) except: import traceback + import logging err = traceback.format_exc() logger = logging.getLogger("Resource.find_types()") - logger.error("Error while lading Resource Managers %s" % err) + logger.error("Error while loading Resource Managers %s" % err) return types