From: Alina Quereilhac Date: Fri, 25 Jul 2014 08:56:52 +0000 (+0200) Subject: Oprimized abort and wait in the EC X-Git-Tag: nepi-3.2.0~118^2~2 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=6be239a7c874733f31bd0d1085f5d52072ee411d;p=nepi.git Oprimized abort and wait in the EC --- diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index a63c40d7..cf0e0bb7 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -51,6 +51,7 @@ class FailureManager(object): def __init__(self, ec): self._ec = weakref.ref(ec) self._failure_level = FailureLevel.OK + self._abort = False @property def ec(self): @@ -62,23 +63,19 @@ class FailureManager(object): @property def abort(self): + return self._abort + + def eval_failure(self, guid): if self._failure_level == FailureLevel.OK: - for guid in self.ec.resources: - try: - state = self.ec.state(guid) - critical = self.ec.get(guid, "critical") - if state == ResourceState.FAILED and critical: - self._failure_level = FailureLevel.RM_FAILURE - self.ec.logger.debug("RM critical failure occurred on guid %d." \ - " Setting EC FAILURE LEVEL to RM_FAILURE" % guid) - break - except: - # An error might occure because a RM was deleted abruptly. - # In this case the error should be ignored. - if guid in self.ec._resources: - raise - - return self._failure_level != FailureLevel.OK + rm = self.get_resource(guid) + state = rm.state + critical = rm.get("critical") + + if state == ResourceState.FAILED and critical: + self._failure_level = FailureLevel.RM_FAILURE + self._abort = True + self.ec.logger.debug("RM critical failure occurred on guid %d." \ + " Setting EC FAILURE LEVEL to RM_FAILURE" % guid) def set_ec_failure(self): self._failure_level = FailureLevel.EC_FAILURE @@ -257,6 +254,16 @@ class ExperimentController(object): """ return self._fm.abort + def inform_failure(self, guid): + """ Reports a failure in a RM to the EC for evaluation + + :param guid: Resource id + :type guid: int + + """ + + return self._fm.eval_failure(guid) + def wait_finished(self, guids): """ Blocking method that waits until all RMs in the 'guids' list have reached a state >= STOPPED (i.e. STOPPED, FAILED or @@ -343,21 +350,20 @@ class ExperimentController(object): break # If a guid reached one of the target states, remove it from list - guid = guids[0] - rstate = self.state(guid) + guid = guids.pop() + rm = self.get_resource(guid) + rstate = rm.state - hrrstate = ResourceState2str.get(rstate) - hrstate = ResourceState2str.get(state) - if rstate >= state: - guids.remove(guid) - rm = self.get_resource(guid) self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % ( - rm.get_rtype(), guid, hrrstate, hrstate)) + rm.get_rtype(), guid, rstate, state)) else: # Debug... self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % ( - guid, hrrstate, hrstate)) + guid, rstate, state)) + + guids.append(guid) + time.sleep(0.5) def get_task(self, tid): @@ -798,8 +804,8 @@ class ExperimentController(object): if not guids: # If no guids list was passed, all 'NEW' RMs will be deployed guids = [] - for guid in self.resources: - if self.state(guid) == ResourceState.NEW: + for guid, rm in self._resources.iteritems(): + if rm.state == ResourceState.NEW: guids.append(guid) if isinstance(guids, int): diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index 36816b56..e82ced04 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -274,7 +274,6 @@ class ResourceManager(Logger): """ return copy.deepcopy(cls._attributes[name]) - @classmethod def get_traces(cls): """ Returns a copy of the traces @@ -598,8 +597,12 @@ class ResourceManager(Logger): :rtype: str """ attr = self._attrs[name] + + """ + A.Q. Commenting due to performance impact if attr.has_flag(Flags.Global): self.warning( "Attribute %s is global. Use get_global instead." % name) + """ return attr.value @@ -877,12 +880,12 @@ class ResourceManager(Logger): # Verify all start conditions are met for (group, state, time) in start_conditions: # Uncomment for debug - unmet = [] - for guid in group: - rm = self.ec.get_resource(guid) - unmet.append((guid, rm._state)) - - self.debug("---- WAITED STATES ---- %s" % unmet ) + #unmet = [] + #for guid in group: + # rm = self.ec.get_resource(guid) + # unmet.append((guid, rm._state)) + # + #self.debug("---- WAITED STATES ---- %s" % unmet ) reschedule, delay = self._needs_reschedule(group, state, time) if reschedule: @@ -942,6 +945,7 @@ class ResourceManager(Logger): # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, ResourceState.PROVISIONED]: + #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES! reschedule = True self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state ) else: @@ -1014,6 +1018,7 @@ class ResourceManager(Logger): def do_fail(self): self.set_failed() + self.ec.inform_failure(self.guid) def set_started(self, time = None): """ Mark ResourceManager as STARTED """