X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fresource.py;h=a9087aec4b2db4b7136f32c27c358056df1146b5;hb=573bc6721a33d323047f204841dd58ef2b83d195;hp=dd5a76445e9fc174f7dbbe337b0c512fd8d0159c;hpb=62a8f5c90afe9033f532206c811cff8ea76b2c09;p=nepi.git diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index dd5a7644..a9087aec 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -17,7 +17,7 @@ # # Author: Alina Quereilhac -from nepi.util.timefuncs import strfnow, strfdiff, strfvalid +from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat from nepi.util.logger import Logger from nepi.execution.trace import TraceAttr @@ -26,16 +26,23 @@ import functools import logging import os import pkgutil +import sys import weakref -reschedule_delay = "0.5s" +reschedule_delay = "1s" class ResourceAction: + """ Action that a user can order to a Resource Manager + + """ DEPLOY = 0 START = 1 STOP = 2 class ResourceState: + """ State of a Resource Manager + + """ NEW = 0 DISCOVERED = 1 PROVISIONED = 2 @@ -76,9 +83,22 @@ def clsinit_copy(cls): # Decorator to invoke class initialization method @clsinit class ResourceManager(Logger): + """ Base clase for all ResourceManagers. + + A ResourceManger is specific to a resource type (e.g. Node, + Switch, Application, etc) on a specific backend (e.g. PlanetLab, + OMF, etc). + + The ResourceManager instances are responsible for interacting with + and controlling concrete (physical or virtual) resources in the + experimental backends. + + """ _rtype = "Resource" _attributes = None _traces = None + _help = None + _backend = None @classmethod def _register_attribute(cls, attr): @@ -161,6 +181,9 @@ class ResourceManager(Logger): @classmethod def rtype(cls): + """ Returns the type of the Resource Manager + + """ return cls._rtype @classmethod @@ -177,6 +200,21 @@ class ResourceManager(Logger): """ return copy.deepcopy(cls._traces.values()) + @classmethod + def get_help(cls): + """ Returns the description of the type of Resource + + """ + return cls._help + + @classmethod + def get_backend(cls): + """ Returns the identified of the backend (i.e. testbed, environment) + for the Resource + + """ + return cls._backend + def __init__(self, ec, guid): super(ResourceManager, self).__init__(self.rtype()) @@ -191,7 +229,9 @@ class ResourceManager(Logger): # the resource instance gets a copy of all traces self._trcs = copy.deepcopy(self._traces) - self._state = ResourceState.NEW + # Each resource is placed on a deployment group by the EC + # during deployment + self.deployment_group = None self._start_time = None self._stop_time = None @@ -199,93 +239,170 @@ class ResourceManager(Logger): self._provision_time = None self._ready_time = None self._release_time = None + self._finish_time = None + self._failed_time = None + + self._state = ResourceState.NEW @property def guid(self): + """ Returns the global unique identifier of the RM """ return self._guid @property def ec(self): + """ Returns the Experiment Controller """ return self._ec() @property def connections(self): + """ Returns the set of guids of connected RMs""" return self._connections @property def conditions(self): + """ Returns the conditions to which the RM is subjected to. + + The object returned by this method is a dictionary indexed by + ResourceAction.""" return self._conditions @property def start_time(self): - """ Returns timestamp with the time the RM started """ + """ Returns the start time of the RM as a timestamp""" return self._start_time @property def stop_time(self): - """ Returns timestamp with the time the RM stopped """ + """ Returns the stop time of the RM as a timestamp""" return self._stop_time @property def discover_time(self): - """ Returns timestamp with the time the RM passed to state discovered """ + """ Returns the time discovering was finished for the RM as a timestamp""" return self._discover_time @property def provision_time(self): - """ Returns timestamp with the time the RM passed to state provisioned """ + """ Returns the time provisioning was finished for the RM as a timestamp""" return self._provision_time @property def ready_time(self): - """ Returns timestamp with the time the RM passed to state ready """ + """ Returns the time deployment was finished for the RM as a timestamp""" return self._ready_time @property def release_time(self): - """ Returns timestamp with the time the RM was released """ + """ Returns the release time of the RM as a timestamp""" return self._release_time + @property + def finish_time(self): + """ Returns the finalization time of the RM as a timestamp""" + return self._finish_time + + @property + def failed_time(self): + """ Returns the time failure occured for the RM as a timestamp""" + return self._failed_time + @property def state(self): + """ Get the state of the current RM """ return self._state def log_message(self, msg): + """ Returns the log message formatted with added information. + + :param msg: text message + :type msg: str + :rtype: str + """ return " %s guid: %d - %s " % (self._rtype, self.guid, msg) - def connect(self, guid): + def register_connection(self, guid): + """ Registers a connection to the RM identified by guid + + :param guid: Global unique identified of the RM to connect to + :type guid: int + """ if self.valid_connection(guid): + self.connect(guid) self._connections.add(guid) + def unregister_connection(self, guid): + """ Removes a registered connection to the RM identified by guid + + :param guid: Global unique identified of the RM to connect to + :type guid: int + """ + if guid in self._connections: + self.disconnect(guid) + self._connections.remove(guid) + def discover(self): - self._discover_time = strfnow() - self._state = ResourceState.DISCOVERED + """ Performs resource discovery. + + This method is resposible for selecting an individual resource + matching user requirements. + This method should be redefined when necessary in child classes. + """ + self.set_discovered() def provision(self): - self._provision_time = strfnow() - self._state = ResourceState.PROVISIONED + """ Performs resource provisioning. - def start(self): - """ Start the Resource Manager + This method is resposible for provisioning one resource. + After this method has been successfully invoked, the resource + should be acccesible/controllable by the RM. + This method should be redefined when necessary in child classes. + """ + self.set_provisioned() + def start(self): + """ Starts the resource. + + There is no generic start behavior for all resources. + This method should be redefined when necessary in child classes. """ - if not self._state in [ResourceState.READY, ResourceState.STOPPED]: + if not self.state in [ResourceState.READY, ResourceState.STOPPED]: self.error("Wrong state %s for start" % self.state) return - self._start_time = strfnow() - self._state = ResourceState.STARTED + self.set_started() def stop(self): - """ Start the Resource Manager - + """ Stops the resource. + + There is no generic stop behavior for all resources. + This method should be redefined when necessary in child classes. """ - if not self._state in [ResourceState.STARTED]: + if not self.state in [ResourceState.STARTED]: self.error("Wrong state %s for stop" % self.state) return + + self.set_stopped() - self._stop_time = strfnow() - self._state = ResourceState.STOPPED + def deploy(self): + """ Execute all steps required for the RM to reach the state READY + + """ + if self.state > ResourceState.READY: + self.error("Wrong state %s for deploy" % self.state) + return + + self.debug("----- READY ---- ") + self.set_ready() + + def release(self): + self.set_released() + + def finish(self): + self.set_finished() + + def fail(self): + self.set_failed() def set(self, name, value): """ Set the value of the attribute @@ -299,7 +416,7 @@ class ResourceManager(Logger): attr.value = value def get(self, name): - """ Start the Resource Manager + """ Returns the value of the attribute :param name: Name of the attribute :type name: str @@ -308,15 +425,24 @@ class ResourceManager(Logger): attr = self._attrs[name] return attr.value - def register_trace(self, name): - """ Enable trace + def enable_trace(self, name): + """ Explicitly enable trace generation :param name: Name of the trace :type name: str """ trace = self._trcs[name] trace.enabled = True + + def trace_enabled(self, name): + """Returns True if trace is enables + :param name: Name of the trace + :type name: str + """ + trace = self._trcs[name] + return trace.enabled + def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): """ Get information on collected trace @@ -340,8 +466,7 @@ class ResourceManager(Logger): """ pass - def register_condition(self, action, group, state, - time = None): + def register_condition(self, action, group, state, time = None): """ Registers a condition on the resource manager to allow execution of 'action' only after 'time' has elapsed from the moment all resources in 'group' reached state 'state' @@ -350,16 +475,17 @@ class ResourceManager(Logger): :type action: str :param group: Group of RMs to wait for (list of guids) :type group: int or list of int - :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED') + :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY') :type state: str :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s') :type time: str """ + + if not action in self.conditions: + self._conditions[action] = list() + conditions = self.conditions.get(action) - if not conditions: - conditions = list() - self._conditions[action] = conditions # For each condition to register a tuple of (group, state, time) is # added to the 'action' list @@ -368,11 +494,49 @@ class ResourceManager(Logger): conditions.append((group, state, time)) - def get_connected(self, rtype): + def unregister_condition(self, group, action = None): + """ Removed conditions for a certain group of guids + + :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY') + :type action: str + + :param group: Group of RMs to wait for (list of guids) + :type group: int or list of int + + """ + # For each condition a tuple of (group, state, time) is + # added to the 'action' list + if not isinstance(group, list): + group = [group] + + for act, conditions in self.conditions.iteritems(): + if action and act != action: + continue + + for condition in list(conditions): + (grp, state, time) = condition + + # If there is an intersection between grp and group, + # then remove intersected elements + intsec = set(group).intersection(set(grp)) + if intsec: + idx = conditions.index(condition) + newgrp = set(grp) + newgrp.difference_update(intsec) + conditions[idx] = (newgrp, state, time) + + def get_connected(self, rtype = None): + """ Returns the list of RM with the type 'rtype' + + :param rtype: Type of the RM we look for + :type rtype: str + :return: list of guid + """ connected = [] + rclass = ResourceFactory.get_resource_type(rtype) for guid in self.connections: rm = self.ec.get_resource(guid) - if rm.rtype() == rtype: + if not rtype or isinstance(rm, rclass): connected.append(rm) return connected @@ -382,7 +546,7 @@ class ResourceManager(Logger): :param group: Group of RMs to wait for (list of guids) :type group: int or list of int - :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED') + :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY') :type state: str :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s') :type time: str @@ -399,7 +563,7 @@ class ResourceManager(Logger): for guid in group: rm = self.ec.get_resource(guid) # If the RM state is lower than the requested state we must - # reschedule (e.g. if RM is READY but we required STARTED) + # reschedule (e.g. if RM is READY but we required STARTED). if rm.state < state: reschedule = True break @@ -418,21 +582,25 @@ class ResourceManager(Logger): elif state == ResourceState.STOPPED: t = rm.stop_time else: - # Only keep time information for START and STOP break - d = strfdiff(strfnow(), t) - wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s")) + # time already elapsed since RM changed state + waited = "%fs" % tdiffsec(tnow(), t) + + # time still to wait + wait = tdiffsec(stabsformat(time), stabsformat(waited)) + if wait > 0.001: reschedule = True delay = "%fs" % wait break + return reschedule, delay def set_with_conditions(self, name, value, group, state, time): """ Set value 'value' on attribute with name 'name' when 'time' - has elapsed since all elements in 'group' have reached state - 'state' + has elapsed since all elements in 'group' have reached state + 'state' :param name: Name of the attribute to set :type name: str @@ -444,7 +612,6 @@ class ResourceManager(Logger): :type state: str :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s') :type time: str - """ reschedule = False @@ -534,27 +701,63 @@ class ResourceManager(Logger): self.debug(" ----- STOPPING ---- ") self.stop() - def deploy(self): - """ Execute all steps required for the RM to reach the state READY + def deploy_with_conditions(self): + """ Deploy RM when all the conditions in self.conditions for + action 'READY' are satisfied. """ - if self._state > ResourceState.READY: - self.error("Wrong state %s for deploy" % self.state) - return + reschedule = False + delay = reschedule_delay - self.debug("----- DEPLOYING ---- ") - self._ready_time = strfnow() - self._state = ResourceState.READY + ## evaluate if set conditions are met - def release(self): - """Clean the resource at the end of the Experiment and change the status + # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED + if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, + ResourceState.PROVISIONED]: + reschedule = True + self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state ) + else: + deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, []) + + self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions) + + # Verify all start conditions are met + for (group, state, time) in deploy_conditions: + # Uncomment for debug + #unmet = [] + #for guid in group: + # rm = self.ec.get_resource(guid) + # unmet.append((guid, rm._state)) + # + #self.debug("---- WAITED STATES ---- %s" % unmet ) + reschedule, delay = self._needs_reschedule(group, state, time) + if reschedule: + break + + if reschedule: + self.ec.schedule(delay, self.deploy_with_conditions) + else: + self.debug("----- STARTING ---- ") + self.deploy() + + + def connect(self, guid): + """ Performs actions that need to be taken upon associating RMs. + This method should be redefined when necessary in child classes. """ - self._release_time = strfnow() - self._state = ResourceState.RELEASED + pass + + def disconnect(self, guid): + """ Performs actions that need to be taken upon disassociating RMs. + This method should be redefined when necessary in child classes. + """ + pass def valid_connection(self, guid): - """Check if the connection is available. + """Checks whether a connection with the other RM + is valid. + This method need to be redefined by each new Resource Manager. :param guid: Guid of the current Resource Manager :type guid: int @@ -563,28 +766,83 @@ class ResourceManager(Logger): """ # TODO: Validate! return True + + def set_started(self): + """ Mark ResourceManager as STARTED """ + self._start_time = tnow() + self._state = ResourceState.STARTED + + def set_stopped(self): + """ Mark ResourceManager as STOPPED """ + self._stop_time = tnow() + self._state = ResourceState.STOPPED + + def set_ready(self): + """ Mark ResourceManager as READY """ + self._ready_time = tnow() + self._state = ResourceState.READY + + def set_released(self): + """ Mark ResourceManager as REALEASED """ + self._release_time = tnow() + self._state = ResourceState.RELEASED + + def set_finished(self): + """ Mark ResourceManager as FINISHED """ + self._finish_time = tnow() + self._state = ResourceState.FINISHED + + def set_failed(self): + """ Mark ResourceManager as FAILED """ + self._failed_time = tnow() + self._state = ResourceState.FAILED + + def set_discovered(self): + """ Mark ResourceManager as DISCOVERED """ + self._discover_time = tnow() + self._state = ResourceState.DISCOVERED + + def set_provisioned(self): + """ Mark ResourceManager as PROVISIONED """ + self._provision_time = tnow() + self._state = ResourceState.PROVISIONED class ResourceFactory(object): _resource_types = dict() @classmethod def resource_types(cls): + """Return the type of the Class""" return cls._resource_types + @classmethod + def get_resource_type(cls, rtype): + """Return the type of the Class""" + return cls._resource_types.get(rtype) + @classmethod def register_type(cls, rclass): + """Register a new Ressource Manager""" cls._resource_types[rclass.rtype()] = rclass @classmethod def create(cls, rtype, ec, guid): + """Create a new instance of a Ressource Manager""" rclass = cls._resource_types[rtype] return rclass(ec, guid) def populate_factory(): - for rclass in find_types(): - ResourceFactory.register_type(rclass) + """Register all the possible RM that exists in the current version of Nepi. + """ + # Once the factory is populated, don't repopulate + if not ResourceFactory.resource_types(): + for rclass in find_types(): + ResourceFactory.register_type(rclass) def find_types(): + """Look into the different folders to find all the + availables Resources Managers + """ search_path = os.environ.get("NEPI_SEARCH_PATH", "") search_path = set(search_path.split(" ")) @@ -595,10 +853,18 @@ def find_types(): types = [] - for importer, modname, ispkg in pkgutil.walk_packages(search_path): + for importer, modname, ispkg in pkgutil.walk_packages(search_path, + prefix = "nepi.resources."): + loader = importer.find_module(modname) + try: - module = loader.load_module(loader.fullname) + # Notice: Repeated calls to load_module will act as a reload of teh module + if modname in sys.modules: + module = sys.modules.get(modname) + else: + module = loader.load_module(modname) + for attrname in dir(module): if attrname.startswith("_"): continue @@ -613,12 +879,16 @@ def find_types(): if issubclass(attr, ResourceManager): types.append(attr) + + if not modname in sys.modules: + sys.modules[modname] = module + except: import traceback import logging err = traceback.format_exc() logger = logging.getLogger("Resource.find_types()") - logger.error("Error while lading Resource Managers %s" % err) + logger.error("Error while loading Resource Managers %s" % err) return types