X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fresource.py;h=16b434fbddb8c730010f329a842ea011756c9b38;hb=0f3a2841944186b6076a8c230b98eb8b20d7ae45;hp=059a758550af1f919fd99dc6553f6a712b36fb19;hpb=fbc7c758e1464d3af686062505fde706fc92b47c;p=nepi.git diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index 059a7585..16b434fb 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -31,8 +31,6 @@ import sys import threading import weakref -reschedule_delay = "1s" - class ResourceAction: """ Action that a user can order to a Resource Manager @@ -47,16 +45,18 @@ class ResourceState: """ NEW = 0 DISCOVERED = 1 - PROVISIONED = 2 - READY = 3 - STARTED = 4 - STOPPED = 5 - FAILED = 6 - RELEASED = 7 + RESERVED = 2 + PROVISIONED = 3 + READY = 4 + STARTED = 5 + STOPPED = 6 + FAILED = 7 + RELEASED = 8 ResourceState2str = dict({ ResourceState.NEW : "NEW", ResourceState.DISCOVERED : "DISCOVERED", + ResourceState.RESERVED : "RESERVED", ResourceState.PROVISIONED : "PROVISIONED", ResourceState.READY : "READY", ResourceState.STARTED : "STARTED", @@ -106,18 +106,20 @@ def clsinit_copy(cls): def failtrap(func): """ Decorator function for instance methods that should set the RM state to FAILED when an error is raised. The methods that must be - decorated are: discover, provision, deploy, start, stop. + decorated are: discover, reserved, provision, deploy, start, stop. """ def wrapped(self, *args, **kwargs): try: return func(self, *args, **kwargs) except: + self.fail() + import traceback err = traceback.format_exc() - self.error(err) - self.debug("SETTING guid %d to state FAILED" % self.guid) - self.fail() + logger = Logger(self._rtype) + logger.error(err) + logger.error("SETTING guid %d to state FAILED" % self.guid) raise return wrapped @@ -127,19 +129,20 @@ class ResourceManager(Logger): """ Base clase for all ResourceManagers. A ResourceManger is specific to a resource type (e.g. Node, - Switch, Application, etc) on a specific backend (e.g. PlanetLab, + Switch, Application, etc) on a specific platform (e.g. PlanetLab, OMF, etc). The ResourceManager instances are responsible for interacting with and controlling concrete (physical or virtual) resources in the - experimental backends. + experimental platforms. """ _rtype = "Resource" _attributes = None _traces = None _help = None - _backend = None + _platform = None + _reschedule_delay = "0.5s" @classmethod def _register_attribute(cls, attr): @@ -186,16 +189,24 @@ class ResourceManager(Logger): attributes. """ - critical = Attribute("critical", "Defines whether the resource is critical. " "A failure on a critical resource will interrupt " "the experiment. ", type = Types.Bool, default = True, - flags = Flags.ExecReadOnly) + flags = Flags.Design) + hard_release = Attribute("hardRelease", + "Forces removal of all result files and directories associated " + "to the RM upon resource release. After release the RM will " + "be removed from the EC and the results will not longer be " + "accessible", + type = Types.Bool, + default = False, + flags = Flags.Design) cls._register_attribute(critical) + cls._register_attribute(hard_release) @classmethod def _register_traces(cls): @@ -266,7 +277,6 @@ class ResourceManager(Logger): """ return copy.deepcopy(cls._attributes[name]) - @classmethod def get_traces(cls): """ Returns a copy of the traces @@ -282,12 +292,38 @@ class ResourceManager(Logger): return cls._help @classmethod - def get_backend(cls): - """ Returns the identified of the backend (i.e. testbed, environment) + def get_platform(cls): + """ Returns the identified of the platform (i.e. testbed type) for the Resource """ - return cls._backend + return cls._platform + + @classmethod + def get_global(cls, name): + """ Returns the value of a global attribute + Global attribute meaning an attribute for + all the resources from a rtype + + :param name: Name of the attribute + :type name: str + :rtype: str + """ + global_attr = cls._attributes[name] + return global_attr.value + + @classmethod + def set_global(cls, name, value): + """ Set value for a global attribute + + :param name: Name of the attribute + :type name: str + :param name: Value of the attribute + :type name: str + """ + global_attr = cls._attributes[name] + global_attr.value = value + return value def __init__(self, ec, guid): super(ResourceManager, self).__init__(self.get_rtype()) @@ -310,6 +346,7 @@ class ResourceManager(Logger): self._start_time = None self._stop_time = None self._discover_time = None + self._reserved_time = None self._provision_time = None self._ready_time = None self._release_time = None @@ -319,7 +356,7 @@ class ResourceManager(Logger): # instance lock to synchronize exclusive state change methods (such # as deploy and release methods), in order to prevent them from being - # executed at the same time + # executed at the same time and corrupt internal resource state self._release_lock = threading.Lock() @property @@ -362,6 +399,11 @@ class ResourceManager(Logger): """ Returns the discover time of the RM as a timestamp """ return self._discover_time + @property + def reserved_time(self): + """ Returns the resreved time of the RM as a timestamp """ + return self._reserved_time + @property def provision_time(self): """ Returns the provision time of the RM as a timestamp """ @@ -387,6 +429,11 @@ class ResourceManager(Logger): """ Get the current state of the RM """ return self._state + @property + def reschedule_delay(self): + """ Returns default reschedule delay """ + return self._reschedule_delay + def log_message(self, msg): """ Returns the log message formatted with added information. @@ -395,7 +442,7 @@ class ResourceManager(Logger): :rtype: str """ - return " %s guid: %d - %s " % (self._rtype, self.guid, msg) + return " %s guid %d - %s " % (self._rtype, self.guid, msg) def register_connection(self, guid): """ Registers a connection to the RM identified by guid @@ -440,6 +487,21 @@ class ResourceManager(Logger): if self._state != ResourceState.RELEASED: self.do_discover() + @failtrap + def reserve(self): + """ Performs resource reserve. + + This method is responsible for reserving an individual resource + matching user requirements. + + This method should not be overriden directly. Specific functionality + should be added in the do_reserved method. + + """ + with self._release_lock: + if self._state != ResourceState.RELEASED: + self.do_reserve() + @failtrap def provision(self): """ Performs resource provisioning. @@ -512,7 +574,6 @@ class ResourceManager(Logger): with self._release_lock: if self._state != ResourceState.RELEASED: self.do_deploy() - self.debug("----- READY ---- ") def release(self): """ Perform actions to free resources used by the RM. @@ -528,12 +589,14 @@ class ResourceManager(Logger): try: self.do_release() except: + self.set_released() + import traceback err = traceback.format_exc() - self.error(err) - - self.set_released() - self.debug("----- RELEASED ---- ") + msg = " %s guid %d ----- FAILED TO RELEASE ----- \n %s " % ( + self._rtype, self.guid, err) + logger = Logger(self._rtype) + logger.debug(msg) def fail(self): """ Sets the RM to state FAILED. @@ -556,6 +619,7 @@ class ResourceManager(Logger): """ attr = self._attrs[name] attr.value = value + return value def get(self, name): """ Returns the value of the attribute @@ -565,8 +629,43 @@ class ResourceManager(Logger): :rtype: str """ attr = self._attrs[name] + + """ + A.Q. Commenting due to performance impact + if attr.has_flag(Flags.Global): + self.warning( "Attribute %s is global. Use get_global instead." % name) + """ + return attr.value + def has_changed(self, name): + """ Returns the True is the value of the attribute + has been modified by the user. + + :param name: Name of the attribute + :type name: str + :rtype: str + """ + attr = self._attrs[name] + return attr.has_changed + + def has_flag(self, name, flag): + """ Returns true if the attribute has the flag 'flag' + + :param flag: Flag to be checked + :type flag: Flags + """ + attr = self._attrs[name] + return attr.has_flag(flag) + + def has_attribute(self, name): + """ Returns true if the RM has an attribute with name + + :param name: name of the attribute + :type name: string + """ + return name in self._attrs + def enable_trace(self, name): """ Explicitly enable trace generation @@ -682,6 +781,18 @@ class ResourceManager(Logger): connected.append(rm) return connected + def is_rm_instance(self, rtype): + """ Returns True if the RM is instance of 'rtype' + + :param rtype: Type of the RM we look for + :type rtype: str + :return: True|False + """ + rclass = ResourceFactory.get_resource_type(rtype) + if isinstance(self, rclass): + return True + return False + @failtrap def _needs_reschedule(self, group, state, time): """ Internal method that verify if 'time' has elapsed since @@ -700,7 +811,7 @@ class ResourceManager(Logger): """ reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay # check state and time elapsed on all RMs for guid in group: @@ -725,7 +836,9 @@ class ResourceManager(Logger): if time: if state == ResourceState.DISCOVERED: t = rm.discover_time - if state == ResourceState.PROVISIONED: + elif state == ResourceState.RESERVED: + t = rm.reserved_time + elif state == ResourceState.PROVISIONED: t = rm.provision_time elif state == ResourceState.READY: t = rm.ready_time @@ -769,7 +882,7 @@ class ResourceManager(Logger): """ reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay ## evaluate if set conditions are met @@ -794,7 +907,7 @@ class ResourceManager(Logger): #import pdb;pdb.set_trace() reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay ## evaluate if conditions to start are met @@ -813,12 +926,12 @@ class ResourceManager(Logger): # Verify all start conditions are met for (group, state, time) in start_conditions: # Uncomment for debug - unmet = [] - for guid in group: - rm = self.ec.get_resource(guid) - unmet.append((guid, rm._state)) - - self.debug("---- WAITED STATES ---- %s" % unmet ) + #unmet = [] + #for guid in group: + # rm = self.ec.get_resource(guid) + # unmet.append((guid, rm._state)) + # + #self.debug("---- WAITED STATES ---- %s" % unmet ) reschedule, delay = self._needs_reschedule(group, state, time) if reschedule: @@ -836,7 +949,7 @@ class ResourceManager(Logger): """ reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay ## evaluate if conditions to stop are met if self.ec.abort: @@ -869,15 +982,16 @@ class ResourceManager(Logger): """ reschedule = False - delay = reschedule_delay + delay = self.reschedule_delay ## evaluate if conditions to deploy are met if self.ec.abort: return # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED - if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, - ResourceState.PROVISIONED]: + if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, + ResourceState.RESERVED, ResourceState.PROVISIONED]: + #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES! reschedule = True self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state ) else: @@ -933,6 +1047,9 @@ class ResourceManager(Logger): def do_discover(self): self.set_discovered() + def do_reserve(self): + self.set_reserved() + def do_provision(self): self.set_provisioned() @@ -946,47 +1063,71 @@ class ResourceManager(Logger): self.set_ready() def do_release(self): - pass + self.set_released() def do_fail(self): self.set_failed() + self.ec.inform_failure(self.guid) - def set_started(self): + def set_started(self, time = None): """ Mark ResourceManager as STARTED """ - self.set_state(ResourceState.STARTED, "_start_time") - - def set_stopped(self): + self.set_state(ResourceState.STARTED, "_start_time", time) + self.debug("----- STARTED ---- ") + + def set_stopped(self, time = None): """ Mark ResourceManager as STOPPED """ - self.set_state(ResourceState.STOPPED, "_stop_time") + self.set_state(ResourceState.STOPPED, "_stop_time", time) + self.debug("----- STOPPED ---- ") - def set_ready(self): + def set_ready(self, time = None): """ Mark ResourceManager as READY """ - self.set_state(ResourceState.READY, "_ready_time") + self.set_state(ResourceState.READY, "_ready_time", time) + self.debug("----- READY ---- ") - def set_released(self): + def set_released(self, time = None): """ Mark ResourceManager as REALEASED """ - self.set_state(ResourceState.RELEASED, "_release_time") + self.set_state(ResourceState.RELEASED, "_release_time", time) + + msg = " %s guid %d ----- RELEASED ----- " % (self._rtype, self.guid) + logger = Logger(self._rtype) + logger.debug(msg) - def set_failed(self): + def set_failed(self, time = None): """ Mark ResourceManager as FAILED """ - self.set_state(ResourceState.FAILED, "_failed_time") + self.set_state(ResourceState.FAILED, "_failed_time", time) - def set_discovered(self): + msg = " %s guid %d ----- FAILED ----- " % (self._rtype, self.guid) + logger = Logger(self._rtype) + logger.debug(msg) + + def set_discovered(self, time = None): """ Mark ResourceManager as DISCOVERED """ - self.set_state(ResourceState.DISCOVERED, "_discover_time") + self.set_state(ResourceState.DISCOVERED, "_discover_time", time) + self.debug("----- DISCOVERED ---- ") + + def set_reserved(self, time = None): + """ Mark ResourceManager as RESERVED """ + self.set_state(ResourceState.RESERVED, "_reserved_time", time) + self.debug("----- RESERVED ---- ") - def set_provisioned(self): + def set_provisioned(self, time = None): """ Mark ResourceManager as PROVISIONED """ - self.set_state(ResourceState.PROVISIONED, "_provision_time") + self.set_state(ResourceState.PROVISIONED, "_provision_time", time) + self.debug("----- PROVISIONED ---- ") - def set_state(self, state, state_time_attr): + def set_state(self, state, state_time_attr, time = None): """ Set the state of the RM while keeping a trace of the time """ # Ensure that RM state will not change after released if self._state == ResourceState.RELEASED: return - - setattr(self, state_time_attr, tnow()) + + time = time or tnow() + self.set_state_time(state, state_time_attr, time) + + def set_state_time(self, state, state_time_attr, time): + """ Set the time for the RM state change """ + setattr(self, state_time_attr, time) self._state = state class ResourceFactory(object): @@ -1014,7 +1155,7 @@ class ResourceFactory(object): return rclass(ec, guid) def populate_factory(): - """Register all the possible RM that exists in the current version of Nepi. + """Find and rgister all available RMs """ # Once the factory is populated, don't repopulate if not ResourceFactory.resource_types(): @@ -1033,7 +1174,7 @@ def find_types(): path = os.path.dirname(nepi.resources.__file__) search_path.add(path) - types = [] + types = set() for importer, modname, ispkg in pkgutil.walk_packages(search_path, prefix = "nepi.resources."): @@ -1041,7 +1182,7 @@ def find_types(): loader = importer.find_module(modname) try: - # Notice: Repeated calls to load_module will act as a reload of teh module + # Notice: Repeated calls to load_module will act as a reload of the module if modname in sys.modules: module = sys.modules.get(modname) else: @@ -1060,10 +1201,7 @@ def find_types(): continue if issubclass(attr, ResourceManager): - if find(attr.get_rtype().lower(), "abstract") > -1: - continue - - types.append(attr) + types.add(attr) if not modname in sys.modules: sys.modules[modname] = module @@ -1077,4 +1215,3 @@ def find_types(): return types -