X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fresource.py;h=f1669d09ae7b2a8763e2f8baca000052e06d5532;hb=23d041fe2f0d9badf6d637009e2d42a4794325c1;hp=18edeb07b18c30f572805a1cb06ca4a0e0a64544;hpb=386498468dfb01f71b0efbbe0c208819f18f82ec;p=nepi.git diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index 18edeb07..f1669d09 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -19,6 +19,7 @@ from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat from nepi.util.logger import Logger +from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import TraceAttr import copy @@ -27,10 +28,9 @@ import logging import os import pkgutil import sys +import threading import weakref -reschedule_delay = "1s" - class ResourceAction: """ Action that a user can order to a Resource Manager @@ -49,9 +49,8 @@ class ResourceState: READY = 3 STARTED = 4 STOPPED = 5 - FINISHED = 6 - FAILED = 7 - RELEASED = 8 + FAILED = 6 + RELEASED = 7 ResourceState2str = dict({ ResourceState.NEW : "NEW", @@ -60,45 +59,88 @@ ResourceState2str = dict({ ResourceState.READY : "READY", ResourceState.STARTED : "STARTED", ResourceState.STOPPED : "STOPPED", - ResourceState.FINISHED : "FINISHED", ResourceState.FAILED : "FAILED", ResourceState.RELEASED : "RELEASED", }) def clsinit(cls): """ Initializes template information (i.e. attributes and traces) - for the ResourceManager class - """ + on classes derived from the ResourceManager class. + + It is used as a decorator in the class declaration as follows: + + @clsinit + class MyResourceManager(ResourceManager): + + ... + + """ + cls._clsinit() return cls def clsinit_copy(cls): """ Initializes template information (i.e. attributes and traces) - for the ResourceManager class, inheriting attributes and traces - from the parent class + on classes direved from the ResourceManager class. + It differs from the clsinit method in that it forces inheritance + of attributes and traces from the parent class. + + It is used as a decorator in the class declaration as follows: + + @clsinit + class MyResourceManager(ResourceManager): + + ... + + + clsinit_copy should be prefered to clsinit when creating new + ResourceManager child classes. + """ + cls._clsinit_copy() return cls -# Decorator to invoke class initialization method +def failtrap(func): + """ Decorator function for instance methods that should set the + RM state to FAILED when an error is raised. The methods that must be + decorated are: discover, provision, deploy, start, stop. + + """ + def wrapped(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except: + self.fail() + + import traceback + err = traceback.format_exc() + logger = Logger(self._rtype) + logger.error(err) + logger.error("SETTING guid %d to state FAILED" % self.guid) + raise + + return wrapped + @clsinit class ResourceManager(Logger): """ Base clase for all ResourceManagers. A ResourceManger is specific to a resource type (e.g. Node, - Switch, Application, etc) on a specific backend (e.g. PlanetLab, + Switch, Application, etc) on a specific platform (e.g. PlanetLab, OMF, etc). The ResourceManager instances are responsible for interacting with and controlling concrete (physical or virtual) resources in the - experimental backends. + experimental platforms. """ _rtype = "Resource" _attributes = None _traces = None _help = None - _backend = None + _platform = None + _reschedule_delay = "0.5s" @classmethod def _register_attribute(cls, attr): @@ -106,6 +148,7 @@ class ResourceManager(Logger): resource attribute """ + cls._attributes[attr.name] = attr @classmethod @@ -114,6 +157,7 @@ class ResourceManager(Logger): resource attribute """ + del cls._attributes[name] @classmethod @@ -122,6 +166,7 @@ class ResourceManager(Logger): resource trace """ + cls._traces[trace.name] = trace @classmethod @@ -130,34 +175,60 @@ class ResourceManager(Logger): resource trace """ + del cls._traces[name] @classmethod def _register_attributes(cls): """ Resource subclasses will invoke this method to register - resource attributes + resource attributes. - """ - pass + This method should be overriden in the RMs that define + attributes. + """ + critical = Attribute("critical", + "Defines whether the resource is critical. " + "A failure on a critical resource will interrupt " + "the experiment. ", + type = Types.Bool, + default = True, + flags = Flags.Design) + hard_release = Attribute("hardRelease", + "Forces removal of all result files and directories associated " + "to the RM upon resource release. After release the RM will " + "be removed from the EC and the results will not longer be " + "accessible", + type = Types.Bool, + default = False, + flags = Flags.Design) + + cls._register_attribute(critical) + cls._register_attribute(hard_release) + @classmethod def _register_traces(cls): """ Resource subclasses will invoke this method to register resource traces + This method should be overriden in the RMs that define traces. + """ + pass @classmethod def _clsinit(cls): - """ ResourceManager child classes have different attributes and traces. - Since the templates that hold the information of attributes and traces - are 'class attribute' dictionaries, initially they all point to the - parent class ResourceManager instances of those dictionaries. - In order to make these templates independent from the parent's one, - it is necessary re-initialize the corresponding dictionaries. - This is the objective of the _clsinit method + """ ResourceManager classes have different attributes and traces. + Attribute and traces are stored in 'class attribute' dictionaries. + When a new ResourceManager class is created, the _clsinit method is + called to create a new instance of those dictionaries and initialize + them. + + The _clsinit method is called by the clsinit decorator method. + """ + # static template for resource attributes cls._attributes = dict() cls._register_attributes() @@ -168,8 +239,12 @@ class ResourceManager(Logger): @classmethod def _clsinit_copy(cls): - """ Same as _clsinit, except that it also inherits all attributes and traces - from the parent class. + """ Same as _clsinit, except that after creating new instances of the + dictionaries it copies all the attributes and traces from the parent + class. + + The _clsinit_copy method is called by the clsinit_copy decorator method. + """ # static template for resource attributes cls._attributes = copy.deepcopy(cls._attributes) @@ -180,7 +255,7 @@ class ResourceManager(Logger): cls._register_traces() @classmethod - def rtype(cls): + def get_rtype(cls): """ Returns the type of the Resource Manager """ @@ -193,6 +268,13 @@ class ResourceManager(Logger): """ return copy.deepcopy(cls._attributes.values()) + @classmethod + def get_attribute(cls, name): + """ Returns a copy of the attribute with name 'name' + + """ + return copy.deepcopy(cls._attributes[name]) + @classmethod def get_traces(cls): """ Returns a copy of the traces @@ -208,15 +290,41 @@ class ResourceManager(Logger): return cls._help @classmethod - def get_backend(cls): - """ Returns the identified of the backend (i.e. testbed, environment) + def get_platform(cls): + """ Returns the identified of the platform (i.e. testbed type) for the Resource """ - return cls._backend + return cls._platform + + @classmethod + def get_global(cls, name): + """ Returns the value of a global attribute + Global attribute meaning an attribute for + all the resources from a rtype + + :param name: Name of the attribute + :type name: str + :rtype: str + """ + global_attr = cls._attributes[name] + return global_attr.value + + @classmethod + def set_global(cls, name, value): + """ Set value for a global attribute + + :param name: Name of the attribute + :type name: str + :param name: Value of the attribute + :type name: str + """ + global_attr = cls._attributes[name] + global_attr.value = value + return value def __init__(self, ec, guid): - super(ResourceManager, self).__init__(self.rtype()) + super(ResourceManager, self).__init__(self.get_rtype()) self._guid = guid self._ec = weakref.ref(ec) @@ -239,11 +347,15 @@ class ResourceManager(Logger): self._provision_time = None self._ready_time = None self._release_time = None - self._finish_time = None self._failed_time = None self._state = ResourceState.NEW + # instance lock to synchronize exclusive state change methods (such + # as deploy and release methods), in order to prevent them from being + # executed at the same time and corrupt internal resource state + self._release_lock = threading.Lock() + @property def guid(self): """ Returns the global unique identifier of the RM """ @@ -251,159 +363,228 @@ class ResourceManager(Logger): @property def ec(self): - """ Returns the Experiment Controller """ + """ Returns the Experiment Controller of the RM """ return self._ec() @property def connections(self): - """ Returns the set of guids of connected RMs""" + """ Returns the set of guids of connected RMs """ return self._connections @property def conditions(self): """ Returns the conditions to which the RM is subjected to. - The object returned by this method is a dictionary indexed by - ResourceAction.""" + This method returns a dictionary of conditions lists indexed by + a ResourceAction. + + """ return self._conditions @property def start_time(self): - """ Returns the start time of the RM as a timestamp""" + """ Returns the start time of the RM as a timestamp """ return self._start_time @property def stop_time(self): - """ Returns the stop time of the RM as a timestamp""" + """ Returns the stop time of the RM as a timestamp """ return self._stop_time @property def discover_time(self): - """ Returns the time discovering was finished for the RM as a timestamp""" + """ Returns the discover time of the RM as a timestamp """ return self._discover_time @property def provision_time(self): - """ Returns the time provisioning was finished for the RM as a timestamp""" + """ Returns the provision time of the RM as a timestamp """ return self._provision_time @property def ready_time(self): - """ Returns the time deployment was finished for the RM as a timestamp""" + """ Returns the deployment time of the RM as a timestamp """ return self._ready_time @property def release_time(self): - """ Returns the release time of the RM as a timestamp""" + """ Returns the release time of the RM as a timestamp """ return self._release_time - @property - def finish_time(self): - """ Returns the finalization time of the RM as a timestamp""" - return self._finish_time - @property def failed_time(self): - """ Returns the time failure occured for the RM as a timestamp""" + """ Returns the time failure occured for the RM as a timestamp """ return self._failed_time @property def state(self): - """ Get the state of the current RM """ + """ Get the current state of the RM """ return self._state + @property + def reschedule_delay(self): + """ Returns default reschedule delay """ + return self._reschedule_delay + def log_message(self, msg): """ Returns the log message formatted with added information. :param msg: text message :type msg: str :rtype: str + """ - return " %s guid: %d - %s " % (self._rtype, self.guid, msg) + return " %s guid %d - %s " % (self._rtype, self.guid, msg) def register_connection(self, guid): """ Registers a connection to the RM identified by guid + This method should not be overriden. Specific functionality + should be added in the do_connect method. + :param guid: Global unique identified of the RM to connect to :type guid: int + """ if self.valid_connection(guid): - self.connect(guid) + self.do_connect(guid) self._connections.add(guid) def unregister_connection(self, guid): """ Removes a registered connection to the RM identified by guid + + This method should not be overriden. Specific functionality + should be added in the do_disconnect method. :param guid: Global unique identified of the RM to connect to :type guid: int + """ if guid in self._connections: - self.disconnect(guid) + self.do_disconnect(guid) self._connections.remove(guid) + @failtrap def discover(self): """ Performs resource discovery. - - This method is resposible for selecting an individual resource + + This method is responsible for selecting an individual resource matching user requirements. - This method should be redefined when necessary in child classes. + + This method should not be overriden directly. Specific functionality + should be added in the do_discover method. + """ - self.set_discovered() + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_discover() + @failtrap def provision(self): """ Performs resource provisioning. - This method is resposible for provisioning one resource. + This method is responsible for provisioning one resource. After this method has been successfully invoked, the resource - should be acccesible/controllable by the RM. - This method should be redefined when necessary in child classes. + should be accessible/controllable by the RM. + + This method should not be overriden directly. Specific functionality + should be added in the do_provision method. + """ - self.set_provisioned() + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_provision() + @failtrap def start(self): - """ Starts the resource. - - There is no generic start behavior for all resources. - This method should be redefined when necessary in child classes. + """ Starts the RM (e.g. launch remote process). + + There is no standard start behavior. Some RMs will not need to perform + any actions upon start. + + This method should not be overriden directly. Specific functionality + should be added in the do_start method. + """ + if not self.state in [ResourceState.READY, ResourceState.STOPPED]: self.error("Wrong state %s for start" % self.state) return - self.set_started() + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_start() + @failtrap def stop(self): - """ Stops the resource. - - There is no generic stop behavior for all resources. - This method should be redefined when necessary in child classes. + """ Interrupts the RM, stopping any tasks the RM was performing. + + There is no standard stop behavior. Some RMs will not need to perform + any actions upon stop. + + This method should not be overriden directly. Specific functionality + should be added in the do_stop method. + """ if not self.state in [ResourceState.STARTED]: self.error("Wrong state %s for stop" % self.state) return - self.set_stopped() + with self._release_lock: + self.do_stop() + @failtrap def deploy(self): - """ Execute all steps required for the RM to reach the state READY + """ Execute all steps required for the RM to reach the state READY. + This method is responsible for deploying the resource (and invoking + the discover and provision methods). + + This method should not be overriden directly. Specific functionality + should be added in the do_deploy method. + """ if self.state > ResourceState.READY: self.error("Wrong state %s for deploy" % self.state) return - self.debug("----- READY ---- ") - self.set_ready() + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_deploy() def release(self): - self.set_released() + """ Perform actions to free resources used by the RM. + + This method is responsible for releasing resources that were + used during the experiment by the RM. + + This method should not be overriden directly. Specific functionality + should be added in the do_release method. + + """ + with self._release_lock: + try: + self.do_release() + except: + self.set_released() + + import traceback + err = traceback.format_exc() + msg = " %s guid %d ----- FAILED TO RELEASE ----- \n %s " % ( + self._rtype, self.guid, err) + logger = Logger(self._rtype) + logger.debug(msg) - def finish(self): - self.set_finished() - def fail(self): - self.set_failed() - self.ec.set_rm_failure() + """ Sets the RM to state FAILED. + + This method should not be overriden directly. Specific functionality + should be added in the do_fail method. + + """ + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_fail() def set(self, name, value): """ Set the value of the attribute @@ -415,6 +596,7 @@ class ResourceManager(Logger): """ attr = self._attrs[name] attr.value = value + return value def get(self, name): """ Returns the value of the attribute @@ -424,8 +606,43 @@ class ResourceManager(Logger): :rtype: str """ attr = self._attrs[name] + + """ + A.Q. Commenting due to performance impact + if attr.has_flag(Flags.Global): + self.warning( "Attribute %s is global. Use get_global instead." % name) + """ + return attr.value + def has_changed(self, name): + """ Returns the True is the value of the attribute + has been modified by the user. + + :param name: Name of the attribute + :type name: str + :rtype: str + """ + attr = self._attrs[name] + return attr.has_changed + + def has_flag(self, name, flag): + """ Returns true if the attribute has the flag 'flag' + + :param flag: Flag to be checked + :type flag: Flags + """ + attr = self._attrs[name] + return attr.has_flag(flag) + + def has_attribute(self, name): + """ Returns true if the RM has an attribute with name + + :param name: name of the attribute + :type name: string + """ + return name in self._attrs + def enable_trace(self, name): """ Explicitly enable trace generation @@ -541,6 +758,19 @@ class ResourceManager(Logger): connected.append(rm) return connected + def is_rm_instance(self, rtype): + """ Returns True if the RM is instance of 'rtype' + + :param rtype: Type of the RM we look for + :type rtype: str + :return: True|False + """ + rclass = ResourceFactory.get_resource_type(rtype) + if isinstance(self, rclass): + return True + return False + + @failtrap def _needs_reschedule(self, group, state, time): """ Internal method that verify if 'time' has elapsed since all elements in 'group' have reached state 'state'. @@ -558,11 +788,20 @@ class ResourceManager(Logger): """ reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay # check state and time elapsed on all RMs for guid in group: rm = self.ec.get_resource(guid) + + # If one of the RMs this resource needs to wait for has FAILED + # and is critical we raise an exception + if rm.state == ResourceState.FAILED: + if not rm.get('critical'): + continue + msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED" + raise RuntimeError, msg + # If the RM state is lower than the requested state we must # reschedule (e.g. if RM is READY but we required STARTED). if rm.state < state: @@ -582,6 +821,8 @@ class ResourceManager(Logger): t = rm.start_time elif state == ResourceState.STOPPED: t = rm.stop_time + elif state == ResourceState.RELEASED: + t = rm.release_time else: break @@ -616,7 +857,7 @@ class ResourceManager(Logger): """ reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay ## evaluate if set conditions are met @@ -638,12 +879,17 @@ class ResourceManager(Logger): action 'START' are satisfied. """ + #import pdb;pdb.set_trace() + reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay - ## evaluate if set conditions are met - # only can start when RM is either STOPPED or READY + ## evaluate if conditions to start are met + if self.ec.abort: + return + + # Can only start when RM is either STOPPED or READY if self.state not in [ResourceState.STOPPED, ResourceState.READY]: reschedule = True self.debug("---- RESCHEDULING START ---- state %s " % self.state ) @@ -678,13 +924,16 @@ class ResourceManager(Logger): """ reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay - ## evaluate if set conditions are met + ## evaluate if conditions to stop are met + if self.ec.abort: + return # only can stop when RM is STARTED if self.state != ResourceState.STARTED: reschedule = True + self.debug("---- RESCHEDULING STOP ---- state %s " % self.state ) else: self.debug(" ---- STOP CONDITIONS ---- %s" % self.conditions.get(ResourceAction.STOP)) @@ -708,13 +957,16 @@ class ResourceManager(Logger): """ reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay - ## evaluate if set conditions are met + ## evaluate if conditions to deploy are met + if self.ec.abort: + return # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, ResourceState.PROVISIONED]: + #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES! reschedule = True self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state ) else: @@ -729,7 +981,7 @@ class ResourceManager(Logger): #for guid in group: # rm = self.ec.get_resource(guid) # unmet.append((guid, rm._state)) - # + #self.debug("---- WAITED STATES ---- %s" % unmet ) reschedule, delay = self._needs_reschedule(group, state, time) @@ -739,16 +991,16 @@ class ResourceManager(Logger): if reschedule: self.ec.schedule(delay, self.deploy_with_conditions) else: - self.debug("----- STARTING ---- ") + self.debug("----- DEPLOYING ---- ") self.deploy() - def connect(self, guid): + def do_connect(self, guid): """ Performs actions that need to be taken upon associating RMs. This method should be redefined when necessary in child classes. """ pass - def disconnect(self, guid): + def do_disconnect(self, guid): """ Performs actions that need to be taken upon disassociating RMs. This method should be redefined when necessary in child classes. """ @@ -766,46 +1018,84 @@ class ResourceManager(Logger): """ # TODO: Validate! return True - - def set_started(self): + + def do_discover(self): + self.set_discovered() + + def do_provision(self): + self.set_provisioned() + + def do_start(self): + self.set_started() + + def do_stop(self): + self.set_stopped() + + def do_deploy(self): + self.set_ready() + + def do_release(self): + self.set_released() + + def do_fail(self): + self.set_failed() + self.ec.inform_failure(self.guid) + + def set_started(self, time = None): """ Mark ResourceManager as STARTED """ - self._start_time = tnow() - self._state = ResourceState.STARTED - - def set_stopped(self): + self.set_state(ResourceState.STARTED, "_start_time", time) + self.debug("----- STARTED ---- ") + + def set_stopped(self, time = None): """ Mark ResourceManager as STOPPED """ - self._stop_time = tnow() - self._state = ResourceState.STOPPED + self.set_state(ResourceState.STOPPED, "_stop_time", time) + self.debug("----- STOPPED ---- ") - def set_ready(self): + def set_ready(self, time = None): """ Mark ResourceManager as READY """ - self._ready_time = tnow() - self._state = ResourceState.READY + self.set_state(ResourceState.READY, "_ready_time", time) + self.debug("----- READY ---- ") - def set_released(self): + def set_released(self, time = None): """ Mark ResourceManager as REALEASED """ - self._release_time = tnow() - self._state = ResourceState.RELEASED + self.set_state(ResourceState.RELEASED, "_release_time", time) - def set_finished(self): - """ Mark ResourceManager as FINISHED """ - self._finish_time = tnow() - self._state = ResourceState.FINISHED + msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid) + logger = Logger(self._rtype) + logger.debug(msg) - def set_failed(self): + def set_failed(self, time = None): """ Mark ResourceManager as FAILED """ - self._failed_time = tnow() - self._state = ResourceState.FAILED + self.set_state(ResourceState.FAILED, "_failed_time", time) + + msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid) + logger = Logger(self._rtype) + logger.debug(msg) - def set_discovered(self): + def set_discovered(self, time = None): """ Mark ResourceManager as DISCOVERED """ - self._discover_time = tnow() - self._state = ResourceState.DISCOVERED + self.set_state(ResourceState.DISCOVERED, "_discover_time", time) + self.debug("----- DISCOVERED ---- ") - def set_provisioned(self): + def set_provisioned(self, time = None): """ Mark ResourceManager as PROVISIONED """ - self._provision_time = tnow() - self._state = ResourceState.PROVISIONED + self.set_state(ResourceState.PROVISIONED, "_provision_time", time) + self.debug("----- PROVISIONED ---- ") + + def set_state(self, state, state_time_attr, time = None): + """ Set the state of the RM while keeping a trace of the time """ + + # Ensure that RM state will not change after released + if self._state == ResourceState.RELEASED: + return + + time = time or tnow() + self.set_state_time(state, state_time_attr, time) + + def set_state_time(self, state, state_time_attr, time): + """ Set the time for the RM state change """ + setattr(self, state_time_attr, time) + self._state = state class ResourceFactory(object): _resource_types = dict() @@ -823,7 +1113,7 @@ class ResourceFactory(object): @classmethod def register_type(cls, rclass): """Register a new Ressource Manager""" - cls._resource_types[rclass.rtype()] = rclass + cls._resource_types[rclass.get_rtype()] = rclass @classmethod def create(cls, rtype, ec, guid): @@ -832,7 +1122,7 @@ class ResourceFactory(object): return rclass(ec, guid) def populate_factory(): - """Register all the possible RM that exists in the current version of Nepi. + """Find and rgister all available RMs """ # Once the factory is populated, don't repopulate if not ResourceFactory.resource_types(): @@ -851,7 +1141,7 @@ def find_types(): path = os.path.dirname(nepi.resources.__file__) search_path.add(path) - types = [] + types = set() for importer, modname, ispkg in pkgutil.walk_packages(search_path, prefix = "nepi.resources."): @@ -859,7 +1149,7 @@ def find_types(): loader = importer.find_module(modname) try: - # Notice: Repeated calls to load_module will act as a reload of teh module + # Notice: Repeated calls to load_module will act as a reload of the module if modname in sys.modules: module = sys.modules.get(modname) else: @@ -878,7 +1168,7 @@ def find_types(): continue if issubclass(attr, ResourceManager): - types.append(attr) + types.add(attr) if not modname in sys.modules: sys.modules[modname] = module @@ -892,4 +1182,3 @@ def find_types(): return types -