X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fresource.py;h=df359f0fed842d25170ec0b42cbdaaceb61bc339;hb=2f73c5b427909b016a438b372d17fb15d2d51ede;hp=325895ab7083a2dfa8edac19fcbf9a0ce942eb1b;hpb=b7a7af6c58676d54726365c00c15106ba8784214;p=nepi.git diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index 325895ab..df359f0f 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -51,9 +51,8 @@ class ResourceState: READY = 3 STARTED = 4 STOPPED = 5 - FINISHED = 6 - FAILED = 7 - RELEASED = 8 + FAILED = 6 + RELEASED = 7 ResourceState2str = dict({ ResourceState.NEW : "NEW", @@ -62,7 +61,6 @@ ResourceState2str = dict({ ResourceState.READY : "READY", ResourceState.STARTED : "STARTED", ResourceState.STOPPED : "STOPPED", - ResourceState.FINISHED : "FINISHED", ResourceState.FAILED : "FAILED", ResourceState.RELEASED : "RELEASED", }) @@ -108,7 +106,7 @@ def clsinit_copy(cls): def failtrap(func): """ Decorator function for instance methods that should set the RM state to FAILED when an error is raised. The methods that must be - decorated are: discover, provision, deploy, start, stop and finish. + decorated are: discover, provision, deploy, start, stop. """ def wrapped(self, *args, **kwargs): @@ -188,16 +186,24 @@ class ResourceManager(Logger): attributes. """ - critical = Attribute("critical", "Defines whether the resource is critical. " "A failure on a critical resource will interrupt " "the experiment. ", type = Types.Bool, default = True, - flags = Flags.ExecReadOnly) + flags = Flags.Design) + hard_release = Attribute("hardRelease", + "Forces removal of all result files and directories associated " + "to the RM upon resource release. After release the RM will " + "be removed from the EC and the results will not longer be " + "accessible", + type = Types.Bool, + default = False, + flags = Flags.Design) cls._register_attribute(critical) + cls._register_attribute(hard_release) @classmethod def _register_traces(cls): @@ -248,7 +254,7 @@ class ResourceManager(Logger): cls._register_traces() @classmethod - def rtype(cls): + def get_rtype(cls): """ Returns the type of the Resource Manager """ @@ -261,6 +267,14 @@ class ResourceManager(Logger): """ return copy.deepcopy(cls._attributes.values()) + @classmethod + def get_attribute(cls, name): + """ Returns a copy of the attribute with name 'name' + + """ + return copy.deepcopy(cls._attributes[name]) + + @classmethod def get_traces(cls): """ Returns a copy of the traces @@ -283,8 +297,34 @@ class ResourceManager(Logger): """ return cls._backend + @classmethod + def get_global(cls, name): + """ Returns the value of a global attribute + Global attribute meaning an attribute for + all the resources from a rtype + + :param name: Name of the attribute + :type name: str + :rtype: str + """ + global_attr = cls._attributes[name] + return global_attr.value + + @classmethod + def set_global(cls, name, value): + """ Set value for a global attribute + + :param name: Name of the attribute + :type name: str + :param name: Value of the attribute + :type name: str + """ + global_attr = cls._attributes[name] + global_attr.value = value + return value + def __init__(self, ec, guid): - super(ResourceManager, self).__init__(self.rtype()) + super(ResourceManager, self).__init__(self.get_rtype()) self._guid = guid self._ec = weakref.ref(ec) @@ -307,7 +347,6 @@ class ResourceManager(Logger): self._provision_time = None self._ready_time = None self._release_time = None - self._finish_time = None self._failed_time = None self._state = ResourceState.NEW @@ -372,11 +411,6 @@ class ResourceManager(Logger): """ Returns the release time of the RM as a timestamp """ return self._release_time - @property - def finish_time(self): - """ Returns the finalization time of the RM as a timestamp """ - return self._finish_time - @property def failed_time(self): """ Returns the time failure occured for the RM as a timestamp """ @@ -395,7 +429,7 @@ class ResourceManager(Logger): :rtype: str """ - return " %s guid: %d - %s " % (self._rtype, self.guid, msg) + return " %s guid %d - %s " % (self._rtype, self.guid, msg) def register_connection(self, guid): """ Registers a connection to the RM identified by guid @@ -512,7 +546,6 @@ class ResourceManager(Logger): with self._release_lock: if self._state != ResourceState.RELEASED: self.do_deploy() - self.debug("----- READY ---- ") def release(self): """ Perform actions to free resources used by the RM. @@ -532,25 +565,7 @@ class ResourceManager(Logger): err = traceback.format_exc() self.error(err) - self.set_released() - self.debug("----- RELEASED ---- ") - - @failtrap - def finish(self): - """ Sets the RM to state FINISHED. - - The FINISHED state is different from STOPPED state in that it - should not be directly invoked by the user. - STOPPED indicates that the user interrupted the RM, FINISHED means - that the RM concluded normally the actions it was supposed to perform. - - This method should not be overriden directly. Specific functionality - should be added in the do_finish method. - - """ - with self._release_lock: - if self._state != ResourceState.RELEASED: - self.do_finish() + self.set_released() def fail(self): """ Sets the RM to state FAILED. @@ -573,6 +588,7 @@ class ResourceManager(Logger): """ attr = self._attrs[name] attr.value = value + return value def get(self, name): """ Returns the value of the attribute @@ -582,8 +598,39 @@ class ResourceManager(Logger): :rtype: str """ attr = self._attrs[name] + if attr.has_flag(Flags.Global): + self.warning( "Attribute %s is global. Use get_global instead." % name) + return attr.value + def has_changed(self, name): + """ Returns the True is the value of the attribute + has been modified by the user. + + :param name: Name of the attribute + :type name: str + :rtype: str + """ + attr = self._attrs[name] + return attr.has_changed() + + def has_flag(self, name, flag): + """ Returns true if the attribute has the flag 'flag' + + :param flag: Flag to be checked + :type flag: Flags + """ + attr = self._attrs[name] + return attr.has_flag(flag) + + def has_attribute(self, name): + """ Returns true if the RM has an attribute with name + + :param name: name of the attribute + :type name: string + """ + return name in self._attrs + def enable_trace(self, name): """ Explicitly enable trace generation @@ -724,8 +771,10 @@ class ResourceManager(Logger): rm = self.ec.get_resource(guid) # If one of the RMs this resource needs to wait for has FAILED - # we raise an exception + # and is critical we raise an exception if rm.state == ResourceState.FAILED: + if not rm.get('critical'): + continue msg = "Resource can not wait for FAILED RM %d. Setting Resource to FAILED" raise RuntimeError, msg @@ -748,8 +797,6 @@ class ResourceManager(Logger): t = rm.start_time elif state == ResourceState.STOPPED: t = rm.stop_time - elif state == ResourceState.FINISHED: - t = rm.finish_time elif state == ResourceState.RELEASED: t = rm.release_time else: @@ -909,7 +956,7 @@ class ResourceManager(Logger): #for guid in group: # rm = self.ec.get_resource(guid) # unmet.append((guid, rm._state)) - # + #self.debug("---- WAITED STATES ---- %s" % unmet ) reschedule, delay = self._needs_reschedule(group, state, time) @@ -963,57 +1010,59 @@ class ResourceManager(Logger): self.set_ready() def do_release(self): - pass - - def do_finish(self): - # In case the RM passed from STARTED directly to FINISHED, - # we set the stop_time for consistency - if self.stop_time == None: - self.set_stopped() - - self.set_finished() + self.set_released() def do_fail(self): self.set_failed() - def set_started(self): + def set_started(self, time = None): """ Mark ResourceManager as STARTED """ - self.set_state(ResourceState.STARTED, "_start_time") - - def set_stopped(self): + self.set_state(ResourceState.STARTED, "_start_time", time) + self.debug("----- STARTED ---- ") + + def set_stopped(self, time = None): """ Mark ResourceManager as STOPPED """ - self.set_state(ResourceState.STOPPED, "_stop_time") + self.set_state(ResourceState.STOPPED, "_stop_time", time) + self.debug("----- STOPPED ---- ") - def set_ready(self): + def set_ready(self, time = None): """ Mark ResourceManager as READY """ - self.set_state(ResourceState.READY, "_ready_time") + self.set_state(ResourceState.READY, "_ready_time", time) + self.debug("----- READY ---- ") - def set_released(self): + def set_released(self, time = None): """ Mark ResourceManager as REALEASED """ - self.set_state(ResourceState.RELEASED, "_release_time") - - def set_finished(self): - """ Mark ResourceManager as FINISHED """ - self.set_state(ResourceState.FINISHED, "_finish_time") + self.set_state(ResourceState.RELEASED, "_release_time", time) + self.debug("----- RELEASED ---- ") - def set_failed(self): + def set_failed(self, time = None): """ Mark ResourceManager as FAILED """ - self.set_state(ResourceState.FAILED, "_failed_time") + self.set_state(ResourceState.FAILED, "_failed_time", time) + self.debug("----- FAILED ---- ") - def set_discovered(self): + def set_discovered(self, time = None): """ Mark ResourceManager as DISCOVERED """ - self.set_state(ResourceState.DISCOVERED, "_discover_time") + self.set_state(ResourceState.DISCOVERED, "_discover_time", time) + self.debug("----- DISCOVERED ---- ") - def set_provisioned(self): + def set_provisioned(self, time = None): """ Mark ResourceManager as PROVISIONED """ - self.set_state(ResourceState.PROVISIONED, "_provision_time") + self.set_state(ResourceState.PROVISIONED, "_provision_time", time) + self.debug("----- PROVISIONED ---- ") + + def set_state(self, state, state_time_attr, time = None): + """ Set the state of the RM while keeping a trace of the time """ - def set_state(self, state, state_time_attr): # Ensure that RM state will not change after released if self._state == ResourceState.RELEASED: return - - setattr(self, state_time_attr, tnow()) + + time = time or tnow() + self.set_state_time(state, state_time_attr, time) + + def set_state_time(self, state, state_time_attr, time): + """ Set the time for the RM state change """ + setattr(self, state_time_attr, time) self._state = state class ResourceFactory(object): @@ -1032,7 +1081,7 @@ class ResourceFactory(object): @classmethod def register_type(cls, rclass): """Register a new Ressource Manager""" - cls._resource_types[rclass.rtype()] = rclass + cls._resource_types[rclass.get_rtype()] = rclass @classmethod def create(cls, rtype, ec, guid): @@ -1060,7 +1109,7 @@ def find_types(): path = os.path.dirname(nepi.resources.__file__) search_path.add(path) - types = [] + types = set() for importer, modname, ispkg in pkgutil.walk_packages(search_path, prefix = "nepi.resources."): @@ -1068,7 +1117,7 @@ def find_types(): loader = importer.find_module(modname) try: - # Notice: Repeated calls to load_module will act as a reload of teh module + # Notice: Repeated calls to load_module will act as a reload of the module if modname in sys.modules: module = sys.modules.get(modname) else: @@ -1087,7 +1136,7 @@ def find_types(): continue if issubclass(attr, ResourceManager): - types.append(attr) + types.add(attr) if not modname in sys.modules: sys.modules[modname] = module @@ -1101,4 +1150,3 @@ def find_types(): return types -