X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fresource.py;h=4394130d75f820bc6b404f11f876b44689c244a2;hb=58a2b493f8df1072a1faa653c8abb6a3f9ba21fa;hp=a7e7758f06ec0761111c07bf79df4b1be7525898;hpb=e41672c333f70c7beb4a9c9c208d77f213092aee;p=nepi.git diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index a7e7758f..4394130d 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -1,59 +1,165 @@ -from neco.util.timefuncs import strfnow, strfdiff, strfvalid -from neco.execution.trace import TraceAttr +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat +from nepi.util.logger import Logger +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.trace import TraceAttr import copy import functools -import inspect import logging import os import pkgutil +import sys +import threading import weakref -reschedule_delay = "0.5s" - class ResourceAction: + """ Action that a user can order to a Resource Manager + + """ DEPLOY = 0 START = 1 STOP = 2 class ResourceState: + """ State of a Resource Manager + + """ NEW = 0 DISCOVERED = 1 - PROVISIONED = 2 - READY = 3 - STARTED = 4 - STOPPED = 5 - FINISHED = 6 + RESERVED = 2 + PROVISIONED = 3 + READY = 4 + STARTED = 5 + STOPPED = 6 FAILED = 7 RELEASED = 8 +ResourceState2str = dict({ + ResourceState.NEW : "NEW", + ResourceState.DISCOVERED : "DISCOVERED", + ResourceState.RESERVED : "RESERVED", + ResourceState.PROVISIONED : "PROVISIONED", + ResourceState.READY : "READY", + ResourceState.STARTED : "STARTED", + ResourceState.STOPPED : "STOPPED", + ResourceState.FAILED : "FAILED", + ResourceState.RELEASED : "RELEASED", + }) + def clsinit(cls): + """ Initializes template information (i.e. attributes and traces) + on classes derived from the ResourceManager class. + + It is used as a decorator in the class declaration as follows: + + @clsinit + class MyResourceManager(ResourceManager): + + ... + + """ + cls._clsinit() return cls -# Decorator to invoke class initialization method +def clsinit_copy(cls): + """ Initializes template information (i.e. attributes and traces) + on classes derived from the ResourceManager class. + It differs from the clsinit method in that it forces inheritance + of attributes and traces from the parent class. + + It is used as a decorator in the class declaration as follows: + + @clsinit + class MyResourceManager(ResourceManager): + + ... + + + clsinit_copy should be prefered to clsinit when creating new + ResourceManager child classes. + + """ + + cls._clsinit_copy() + return cls + +def failtrap(func): + """ Decorator function for instance methods that should set the + RM state to FAILED when an error is raised. The methods that must be + decorated are: discover, reserved, provision, deploy, start, stop. + + """ + def wrapped(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except: + self.fail() + + import traceback + err = traceback.format_exc() + logger = Logger(self._rtype) + logger.error(err) + logger.error("SETTING guid %d to state FAILED" % self.guid) + raise + + return wrapped + @clsinit -class ResourceManager(object): +class ResourceManager(Logger): + """ Base clase for all ResourceManagers. + + A ResourceManger is specific to a resource type (e.g. Node, + Switch, Application, etc) on a specific platform (e.g. PlanetLab, + OMF, etc). + + The ResourceManager instances are responsible for interacting with + and controlling concrete (physical or virtual) resources in the + experimental platforms. + + """ _rtype = "Resource" - _filters = None _attributes = None _traces = None + _help = None + _platform = None + _reschedule_delay = "0.5s" @classmethod - def _register_filter(cls, attr): + def _register_attribute(cls, attr): """ Resource subclasses will invoke this method to add a - filter attribute + resource attribute """ - cls._filters[attr.name] = attr + + cls._attributes[attr.name] = attr @classmethod - def _register_attribute(cls, attr): - """ Resource subclasses will invoke this method to add a + def _remove_attribute(cls, name): + """ Resource subclasses will invoke this method to remove a resource attribute """ - cls._attributes[attr.name] = attr + + del cls._attributes[name] @classmethod def _register_trace(cls, trace): @@ -61,45 +167,69 @@ class ResourceManager(object): resource trace """ + cls._traces[trace.name] = trace - @classmethod - def _register_filters(cls): - """ Resource subclasses will invoke this method to register - resource filters + def _remove_trace(cls, name): + """ Resource subclasses will invoke this method to remove a + resource trace """ - pass + + del cls._traces[name] @classmethod def _register_attributes(cls): """ Resource subclasses will invoke this method to register - resource attributes + resource attributes. - """ - pass + This method should be overriden in the RMs that define + attributes. + """ + critical = Attribute("critical", + "Defines whether the resource is critical. " + "A failure on a critical resource will interrupt " + "the experiment. ", + type = Types.Bool, + default = True, + flags = Flags.Design) + hard_release = Attribute("hardRelease", + "Forces removal of all result files and directories associated " + "to the RM upon resource release. After release the RM will " + "be removed from the EC and the results will not longer be " + "accessible", + type = Types.Bool, + default = False, + flags = Flags.Design) + + cls._register_attribute(critical) + cls._register_attribute(hard_release) + @classmethod def _register_traces(cls): """ Resource subclasses will invoke this method to register resource traces + This method should be overridden in the RMs that define traces. + """ + pass @classmethod def _clsinit(cls): - """ Create a new dictionnary instance of the dictionnary - with the same template. - - Each ressource should have the same registration dictionary - template with different instances. + """ ResourceManager classes have different attributes and traces. + Attribute and traces are stored in 'class attribute' dictionaries. + When a new ResourceManager class is created, the _clsinit method is + called to create a new instance of those dictionaries and initialize + them. + + The _clsinit method is called by the clsinit decorator method. + """ - # static template for resource filters - cls._filters = dict() - cls._register_filters() - + # static template for resource attributes cls._attributes = dict() cls._register_attributes() @@ -109,31 +239,94 @@ class ResourceManager(object): cls._register_traces() @classmethod - def rtype(cls): - return cls._rtype + def _clsinit_copy(cls): + """ Same as _clsinit, except that after creating new instances of the + dictionaries it copies all the attributes and traces from the parent + class. + + The _clsinit_copy method is called by the clsinit_copy decorator method. + + """ + # static template for resource attributes + cls._attributes = copy.deepcopy(cls._attributes) + cls._register_attributes() + + # static template for resource traces + cls._traces = copy.deepcopy(cls._traces) + cls._register_traces() @classmethod - def get_filters(cls): - """ Returns a copy of the filters + def get_rtype(cls): + """ Returns the type of the Resource Manager """ - return copy.deepcopy(cls._filters.values()) + return cls._rtype @classmethod def get_attributes(cls): """ Returns a copy of the attributes """ - return copy.deepcopy(cls._attributes.values()) + return copy.deepcopy(list(cls._attributes.values())) + + @classmethod + def get_attribute(cls, name): + """ Returns a copy of the attribute with name 'name' + + """ + return copy.deepcopy(cls._attributes[name]) @classmethod def get_traces(cls): """ Returns a copy of the traces """ - return copy.deepcopy(cls._traces.values()) + return copy.deepcopy(list(cls._traces.values())) + + @classmethod + def get_help(cls): + """ Returns the description of the type of Resource + + """ + return cls._help + + @classmethod + def get_platform(cls): + """ Returns the identified of the platform (i.e. testbed type) + for the Resource + + """ + return cls._platform + + @classmethod + def get_global(cls, name): + """ Returns the value of a global attribute + Global attribute meaning an attribute for + all the resources from a rtype + + :param name: Name of the attribute + :type name: str + :rtype: str + """ + global_attr = cls._attributes[name] + return global_attr.value + + @classmethod + def set_global(cls, name, value): + """ Set value for a global attribute + + :param name: Name of the attribute + :type name: str + :param name: Value of the attribute + :type name: str + """ + global_attr = cls._attributes[name] + global_attr.value = value + return value def __init__(self, ec, guid): + super(ResourceManager, self).__init__(self.get_rtype()) + self._guid = guid self._ec = weakref.ref(ec) self._connections = set() @@ -145,131 +338,291 @@ class ResourceManager(object): # the resource instance gets a copy of all traces self._trcs = copy.deepcopy(self._traces) - self._state = ResourceState.NEW + # Each resource is placed on a deployment group by the EC + # during deployment + self.deployment_group = None self._start_time = None self._stop_time = None self._discover_time = None + self._reserved_time = None self._provision_time = None self._ready_time = None self._release_time = None + self._failed_time = None - # Logging - self._logger = logging.getLogger("Resource") - - def debug(self, msg, out = None, err = None): - self.log(msg, logging.DEBUG, out, err) - - def error(self, msg, out = None, err = None): - self.log(msg, logging.ERROR, out, err) - - def warn(self, msg, out = None, err = None): - self.log(msg, logging.WARNING, out, err) - - def info(self, msg, out = None, err = None): - self.log(msg, logging.INFO, out, err) - - def log(self, msg, level, out = None, err = None): - if out: - msg += " - OUT: %s " % out - - if err: - msg += " - ERROR: %s " % err - - msg = self.log_message(msg) - - self.logger.log(level, msg) - - def log_message(self, msg): - return " %s guid: %d - %s " % (self._rtype, self.guid, msg) + self._state = ResourceState.NEW - @property - def logger(self): - return self._logger + # instance lock to synchronize exclusive state change methods (such + # as deploy and release methods), in order to prevent them from being + # executed at the same time and corrupt internal resource state + self._release_lock = threading.Lock() @property def guid(self): + """ Returns the global unique identifier of the RM """ return self._guid @property def ec(self): + """ Returns the Experiment Controller of the RM """ return self._ec() @property def connections(self): + """ Returns the set of guids of connected RMs """ return self._connections @property def conditions(self): + """ Returns the conditions to which the RM is subjected to. + + This method returns a dictionary of conditions lists indexed by + a ResourceAction. + + """ return self._conditions @property def start_time(self): - """ Returns timestamp with the time the RM started """ + """ Returns the start time of the RM as a timestamp """ return self._start_time @property def stop_time(self): - """ Returns timestamp with the time the RM stopped """ + """ Returns the stop time of the RM as a timestamp """ return self._stop_time @property def discover_time(self): - """ Returns timestamp with the time the RM passed to state discovered """ + """ Returns the discover time of the RM as a timestamp """ return self._discover_time + @property + def reserved_time(self): + """ Returns the reserved time of the RM as a timestamp """ + return self._reserved_time + @property def provision_time(self): - """ Returns timestamp with the time the RM passed to state provisioned """ + """ Returns the provision time of the RM as a timestamp """ return self._provision_time @property def ready_time(self): - """ Returns timestamp with the time the RM passed to state ready """ + """ Returns the deployment time of the RM as a timestamp """ return self._ready_time @property def release_time(self): - """ Returns timestamp with the time the RM was released """ + """ Returns the release time of the RM as a timestamp """ return self._release_time + @property + def failed_time(self): + """ Returns the time failure occurred for the RM as a timestamp """ + return self._failed_time + @property def state(self): + """ Get the current state of the RM """ return self._state - def connect(self, guid): + @property + def reschedule_delay(self): + """ Returns default reschedule delay """ + return self._reschedule_delay + + def log_message(self, msg): + """ Returns the log message formatted with added information. + + :param msg: text message + :type msg: str + :rtype: str + + """ + return " %s guid %d - %s " % (self._rtype, self.guid, msg) + + def register_connection(self, guid): + """ Registers a connection to the RM identified by guid + + This method should not be overridden. Specific functionality + should be added in the do_connect method. + + :param guid: Global unique identified of the RM to connect to + :type guid: int + + """ if self.valid_connection(guid): + self.do_connect(guid) self._connections.add(guid) - def discover(self, filters = None): - self._discover_time = strfnow() - self._state = ResourceState.DISCOVERED + def unregister_connection(self, guid): + """ Removes a registered connection to the RM identified by guid + + This method should not be overridden. Specific functionality + should be added in the do_disconnect method. + + :param guid: Global unique identified of the RM to connect to + :type guid: int + + """ + if guid in self._connections: + self.do_disconnect(guid) + self._connections.remove(guid) + + @failtrap + def discover(self): + """ Performs resource discovery. + + This method is responsible for selecting an individual resource + matching user requirements. + + This method should not be overridden directly. Specific functionality + should be added in the do_discover method. + + """ + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_discover() - def provision(self, filters = None): - self._provision_time = strfnow() - self._state = ResourceState.PROVISIONED + @failtrap + def reserve(self): + """ Performs resource reserve. + + This method is responsible for reserving an individual resource + matching user requirements. + This method should not be overridden directly. Specific functionality + should be added in the do_reserved method. + + """ + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_reserve() + + @failtrap + def provision(self): + """ Performs resource provisioning. + + This method is responsible for provisioning one resource. + After this method has been successfully invoked, the resource + should be accessible/controllable by the RM. + + This method should not be overridden directly. Specific functionality + should be added in the do_provision method. + + """ + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_provision() + + @failtrap + def configure(self): + """ Performs resource configuration. + + This method is responsible for configuring one resource. + After this method has been successfully invoked, the resource + should be set up to start the experimentation. + + This method should not be overridden directly. Specific functionality + should be added in the do_configure method. + + """ + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_configure() + + @failtrap def start(self): - """ Start the Resource Manager + """ Starts the RM (e.g. launch remote process). + + There is no standard start behavior. Some RMs will not need to perform + any actions upon start. + + This method should not be overridden directly. Specific functionality + should be added in the do_start method. """ - if not self._state in [ResourceState.READY, ResourceState.STOPPED]: + + if not self.state in [ResourceState.READY, ResourceState.STOPPED]: self.error("Wrong state %s for start" % self.state) return - self._start_time = strfnow() - self._state = ResourceState.STARTED + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_start() + @failtrap def stop(self): - """ Start the Resource Manager - + """ Interrupts the RM, stopping any tasks the RM was performing. + + There is no standard stop behavior. Some RMs will not need to perform + any actions upon stop. + + This method should not be overridden directly. Specific functionality + should be added in the do_stop method. + """ - if not self._state in [ResourceState.STARTED]: + if not self.state in [ResourceState.STARTED]: self.error("Wrong state %s for stop" % self.state) return + + with self._release_lock: + self.do_stop() + + @failtrap + def deploy(self): + """ Execute all steps required for the RM to reach the state READY. + + This method is responsible for deploying the resource (and invoking + the discover and provision methods). + + This method should not be overridden directly. Specific functionality + should be added in the do_deploy method. + + """ + if self.state > ResourceState.READY: + self.error("Wrong state %s for deploy" % self.state) + return - self._stop_time = strfnow() - self._state = ResourceState.STOPPED + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_deploy() + + def release(self): + """ Perform actions to free resources used by the RM. + + This method is responsible for releasing resources that were + used during the experiment by the RM. + + This method should not be overridden directly. Specific functionality + should be added in the do_release method. + + """ + with self._release_lock: + try: + self.do_release() + except: + self.set_released() + + import traceback + err = traceback.format_exc() + msg = " %s guid %d ----- FAILED TO RELEASE ----- \n %s " % ( + self._rtype, self.guid, err) + logger = Logger(self._rtype) + logger.debug(msg) + + def fail(self): + """ Sets the RM to state FAILED. + + This method should not be overridden directly. Specific functionality + should be added in the do_fail method. + + """ + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_fail() def set(self, name, value): """ Set the value of the attribute @@ -281,26 +634,71 @@ class ResourceManager(object): """ attr = self._attrs[name] attr.value = value + return value def get(self, name): - """ Start the Resource Manager + """ Returns the value of the attribute :param name: Name of the attribute :type name: str :rtype: str """ attr = self._attrs[name] + + """ + A.Q. Commenting due to performance impact + if attr.has_flag(Flags.Global): + self.warning( "Attribute %s is global. Use get_global instead." % name) + """ + return attr.value - def register_trace(self, name): - """ Enable trace + def has_changed(self, name): + """ Returns the True is the value of the attribute + has been modified by the user. + + :param name: Name of the attribute + :type name: str + :rtype: str + """ + attr = self._attrs[name] + return attr.has_changed + + def has_flag(self, name, flag): + """ Returns true if the attribute has the flag 'flag' + + :param flag: Flag to be checked + :type flag: Flags + """ + attr = self._attrs[name] + return attr.has_flag(flag) + + def has_attribute(self, name): + """ Returns true if the RM has an attribute with name + + :param name: name of the attribute + :type name: string + """ + return name in self._attrs + + def enable_trace(self, name): + """ Explicitly enable trace generation :param name: Name of the trace :type name: str """ trace = self._trcs[name] trace.enabled = True + + def trace_enabled(self, name): + """Returns True if trace is enables + :param name: Name of the trace + :type name: str + """ + trace = self._trcs[name] + return trace.enabled + def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): """ Get information on collected trace @@ -324,8 +722,7 @@ class ResourceManager(object): """ pass - def register_condition(self, action, group, state, - time = None): + def register_condition(self, action, group, state, time = None): """ Registers a condition on the resource manager to allow execution of 'action' only after 'time' has elapsed from the moment all resources in 'group' reached state 'state' @@ -334,16 +731,17 @@ class ResourceManager(object): :type action: str :param group: Group of RMs to wait for (list of guids) :type group: int or list of int - :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED') + :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY') :type state: str :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s') :type time: str """ + + if not action in self.conditions: + self._conditions[action] = list() + conditions = self.conditions.get(action) - if not conditions: - conditions = list() - self._conditions[action] = conditions # For each condition to register a tuple of (group, state, time) is # added to the 'action' list @@ -352,21 +750,72 @@ class ResourceManager(object): conditions.append((group, state, time)) - def get_connected(self, rtype): + def unregister_condition(self, group, action = None): + """ Removed conditions for a certain group of guids + + :param action: Action to restrict to condition (either 'START', 'STOP' or 'READY') + :type action: str + + :param group: Group of RMs to wait for (list of guids) + :type group: int or list of int + + """ + # For each condition a tuple of (group, state, time) is + # added to the 'action' list + if not isinstance(group, list): + group = [group] + + for act, conditions in self.conditions.items(): + if action and act != action: + continue + + for condition in list(conditions): + (grp, state, time) = condition + + # If there is an intersection between grp and group, + # then remove intersected elements + intsec = set(group).intersection(set(grp)) + if intsec: + idx = conditions.index(condition) + newgrp = set(grp) + newgrp.difference_update(intsec) + conditions[idx] = (newgrp, state, time) + + def get_connected(self, rtype = None): + """ Returns the list of RM with the type 'rtype' + + :param rtype: Type of the RM we look for + :type rtype: str + :return: list of guid + """ connected = [] + rclass = ResourceFactory.get_resource_type(rtype) for guid in self.connections: rm = self.ec.get_resource(guid) - if rm.rtype() == rtype: + if not rtype or isinstance(rm, rclass): connected.append(rm) return connected + def is_rm_instance(self, rtype): + """ Returns True if the RM is instance of 'rtype' + + :param rtype: Type of the RM we look for + :type rtype: str + :return: True|False + """ + rclass = ResourceFactory.get_resource_type(rtype) + if isinstance(self, rclass): + return True + return False + + @failtrap def _needs_reschedule(self, group, state, time): """ Internal method that verify if 'time' has elapsed since all elements in 'group' have reached state 'state'. :param group: Group of RMs to wait for (list of guids) :type group: int or list of int - :param state: State to wait for on all RM in group. (either 'STARTED' or 'STOPPED') + :param state: State to wait for on all RM in group. (either 'STARTED', 'STOPPED' or 'READY') :type state: str :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s') :type time: str @@ -377,13 +826,22 @@ class ResourceManager(object): """ reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay # check state and time elapsed on all RMs for guid in group: rm = self.ec.get_resource(guid) + + # If one of the RMs this resource needs to wait for has FAILED + # and is critical we raise an exception + if rm.state == ResourceState.FAILED: + if not rm.get('critical'): + continue + msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED" + raise RuntimeError(msg) + # If the RM state is lower than the requested state we must - # reschedule (e.g. if RM is READY but we required STARTED) + # reschedule (e.g. if RM is READY but we required STARTED). if rm.state < state: reschedule = True break @@ -393,7 +851,9 @@ class ResourceManager(object): if time: if state == ResourceState.DISCOVERED: t = rm.discover_time - if state == ResourceState.PROVISIONED: + elif state == ResourceState.RESERVED: + t = rm.reserved_time + elif state == ResourceState.PROVISIONED: t = rm.provision_time elif state == ResourceState.READY: t = rm.ready_time @@ -401,22 +861,28 @@ class ResourceManager(object): t = rm.start_time elif state == ResourceState.STOPPED: t = rm.stop_time + elif state == ResourceState.RELEASED: + t = rm.release_time else: - # Only keep time information for START and STOP break - d = strfdiff(strfnow(), t) - wait = strfdiff(strfvalid(time),strfvalid(str(d)+"s")) + # time already elapsed since RM changed state + waited = "%fs" % tdiffsec(tnow(), t) + + # time still to wait + wait = tdiffsec(stabsformat(time), stabsformat(waited)) + if wait > 0.001: reschedule = True delay = "%fs" % wait break + return reschedule, delay def set_with_conditions(self, name, value, group, state, time): """ Set value 'value' on attribute with name 'name' when 'time' - has elapsed since all elements in 'group' have reached state - 'state' + has elapsed since all elements in 'group' have reached state + 'state' :param name: Name of the attribute to set :type name: str @@ -428,11 +894,10 @@ class ResourceManager(object): :type state: str :param time: Time to wait after 'state' is reached on all RMs in group. (e.g. '2s') :type time: str - """ reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay ## evaluate if set conditions are met @@ -454,12 +919,17 @@ class ResourceManager(object): action 'START' are satisfied. """ + #import pdb;pdb.set_trace() + reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay - ## evaluate if set conditions are met - # only can start when RM is either STOPPED or READY + ## evaluate if conditions to start are met + if self.ec.abort: + return + + # Can only start when RM is either STOPPED or READY if self.state not in [ResourceState.STOPPED, ResourceState.READY]: reschedule = True self.debug("---- RESCHEDULING START ---- state %s " % self.state ) @@ -494,13 +964,16 @@ class ResourceManager(object): """ reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay - ## evaluate if set conditions are met + ## evaluate if conditions to stop are met + if self.ec.abort: + return # only can stop when RM is STARTED if self.state != ResourceState.STARTED: reschedule = True + self.debug("---- RESCHEDULING STOP ---- state %s " % self.state ) else: self.debug(" ---- STOP CONDITIONS ---- %s" % self.conditions.get(ResourceAction.STOP)) @@ -515,30 +988,68 @@ class ResourceManager(object): callback = functools.partial(self.stop_with_conditions) self.ec.schedule(delay, callback) else: - self.logger.debug(" ----- STOPPING ---- ") + self.debug(" ----- STOPPING ---- ") self.stop() - def deploy(self): - """ Execute all steps required for the RM to reach the state READY + def deploy_with_conditions(self): + """ Deploy RM when all the conditions in self.conditions for + action 'READY' are satisfied. """ - if self._state > ResourceState.READY: - self.error("Wrong state %s for deploy" % self.state) - return + reschedule = False + delay = self.reschedule_delay - self.debug("----- DEPLOYING ---- ") - self._ready_time = strfnow() - self._state = ResourceState.READY + ## evaluate if conditions to deploy are met + if self.ec.abort: + return - def release(self): - """Clean the resource at the end of the Experiment and change the status + # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED + if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, + ResourceState.RESERVED, ResourceState.PROVISIONED]: + #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES! + reschedule = True + self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state ) + else: + deploy_conditions = self.conditions.get(ResourceAction.DEPLOY, []) + + self.debug("---- DEPLOY CONDITIONS ---- %s" % deploy_conditions) + + # Verify all start conditions are met + for (group, state, time) in deploy_conditions: + # Uncomment for debug + #unmet = [] + #for guid in group: + # rm = self.ec.get_resource(guid) + # unmet.append((guid, rm._state)) + + #self.debug("---- WAITED STATES ---- %s" % unmet ) + + reschedule, delay = self._needs_reschedule(group, state, time) + if reschedule: + break + + if reschedule: + self.ec.schedule(delay, self.deploy_with_conditions) + else: + self.debug("----- DEPLOYING ---- ") + self.deploy() + + def do_connect(self, guid): + """ Performs actions that need to be taken upon associating RMs. + This method should be redefined when necessary in child classes. + """ + pass + def do_disconnect(self, guid): + """ Performs actions that need to be taken upon disassociating RMs. + This method should be redefined when necessary in child classes. """ - self._release_time = strfnow() - self._state = ResourceState.RELEASED + pass def valid_connection(self, guid): - """Check if the connection is available. + """Checks whether a connection with the other RM + is valid. + This method need to be redefined by each new Resource Manager. :param guid: Guid of the current Resource Manager :type guid: int @@ -548,40 +1059,153 @@ class ResourceManager(object): # TODO: Validate! return True + def do_discover(self): + self.set_discovered() + + def do_reserve(self): + self.set_reserved() + + def do_provision(self): + self.set_provisioned() + + def do_configure(self): + pass + + def do_start(self): + self.set_started() + + def do_stop(self): + self.set_stopped() + + def do_deploy(self): + self.set_ready() + + def do_release(self): + self.set_released() + + def do_fail(self): + self.set_failed() + self.ec.inform_failure(self.guid) + + def set_started(self, time = None): + """ Mark ResourceManager as STARTED """ + self.set_state(ResourceState.STARTED, "_start_time", time) + self.debug("----- STARTED ---- ") + + def set_stopped(self, time = None): + """ Mark ResourceManager as STOPPED """ + self.set_state(ResourceState.STOPPED, "_stop_time", time) + self.debug("----- STOPPED ---- ") + + def set_ready(self, time = None): + """ Mark ResourceManager as READY """ + self.set_state(ResourceState.READY, "_ready_time", time) + self.debug("----- READY ---- ") + + def set_released(self, time = None): + """ Mark ResourceManager as REALEASED """ + self.set_state(ResourceState.RELEASED, "_release_time", time) + + msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid) + logger = Logger(self._rtype) + logger.debug(msg) + + def set_failed(self, time = None): + """ Mark ResourceManager as FAILED """ + self.set_state(ResourceState.FAILED, "_failed_time", time) + + msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid) + logger = Logger(self._rtype) + logger.debug(msg) + + def set_discovered(self, time = None): + """ Mark ResourceManager as DISCOVERED """ + self.set_state(ResourceState.DISCOVERED, "_discover_time", time) + self.debug("----- DISCOVERED ---- ") + + def set_reserved(self, time = None): + """ Mark ResourceManager as RESERVED """ + self.set_state(ResourceState.RESERVED, "_reserved_time", time) + self.debug("----- RESERVED ---- ") + + def set_provisioned(self, time = None): + """ Mark ResourceManager as PROVISIONED """ + self.set_state(ResourceState.PROVISIONED, "_provision_time", time) + self.debug("----- PROVISIONED ---- ") + + def set_state(self, state, state_time_attr, time = None): + """ Set the state of the RM while keeping a trace of the time """ + + # Ensure that RM state will not change after released + if self._state == ResourceState.RELEASED: + return + + time = time or tnow() + self.set_state_time(state, state_time_attr, time) + + def set_state_time(self, state, state_time_attr, time): + """ Set the time for the RM state change """ + setattr(self, state_time_attr, time) + self._state = state + class ResourceFactory(object): _resource_types = dict() @classmethod def resource_types(cls): + """Return the type of the Class""" return cls._resource_types + @classmethod + def get_resource_type(cls, rtype): + """Return the type of the Class""" + return cls._resource_types.get(rtype) + @classmethod def register_type(cls, rclass): - cls._resource_types[rclass.rtype()] = rclass + """Register a new Ressource Manager""" + cls._resource_types[rclass.get_rtype()] = rclass @classmethod def create(cls, rtype, ec, guid): + """Create a new instance of a Ressource Manager""" rclass = cls._resource_types[rtype] return rclass(ec, guid) def populate_factory(): - for rclass in find_types(): - ResourceFactory.register_type(rclass) + """Find and rgister all available RMs + """ + # Once the factory is populated, don't repopulate + if not ResourceFactory.resource_types(): + for rclass in find_types(): + ResourceFactory.register_type(rclass) def find_types(): + """Look into the different folders to find all the + availables Resources Managers + """ search_path = os.environ.get("NEPI_SEARCH_PATH", "") search_path = set(search_path.split(" ")) - import neco.resources - path = os.path.dirname(neco.resources.__file__) + import inspect + import nepi.resources + path = os.path.dirname(nepi.resources.__file__) search_path.add(path) - types = [] + types = set() + + for importer, modname, ispkg in pkgutil.walk_packages(search_path, + prefix = "nepi.resources."): - for importer, modname, ispkg in pkgutil.walk_packages(search_path): loader = importer.find_module(modname) + try: - module = loader.load_module(loader.fullname) + # Notice: Repeated calls to load_module will act as a reload of the module + if modname in sys.modules: + module = sys.modules.get(modname) + else: + module = loader.load_module(modname) + for attrname in dir(module): if attrname.startswith("_"): continue @@ -595,13 +1219,17 @@ def find_types(): continue if issubclass(attr, ResourceManager): - types.append(attr) + types.add(attr) + + if not modname in sys.modules: + sys.modules[modname] = module + except: import traceback + import logging err = traceback.format_exc() logger = logging.getLogger("Resource.find_types()") - logger.error("Error while lading Resource Managers %s" % err) + logger.error("Error while loading Resource Managers %s" % err) return types -