Oprimized abort and wait in the EC
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 25 Jul 2014 08:56:52 +0000 (10:56 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Fri, 25 Jul 2014 08:56:52 +0000 (10:56 +0200)
src/nepi/execution/ec.py
src/nepi/execution/resource.py

index a63c40d..cf0e0bb 100644 (file)
@@ -51,6 +51,7 @@ class FailureManager(object):
     def __init__(self, ec):
         self._ec = weakref.ref(ec)
         self._failure_level = FailureLevel.OK
+        self._abort = False
 
     @property
     def ec(self):
@@ -62,23 +63,19 @@ class FailureManager(object):
 
     @property
     def abort(self):
+        return self._abort
+
+    def eval_failure(self, guid):
         if self._failure_level == FailureLevel.OK:
-            for guid in self.ec.resources:
-                try:
-                    state = self.ec.state(guid)
-                    critical = self.ec.get(guid, "critical")
-                    if state == ResourceState.FAILED and critical:
-                        self._failure_level = FailureLevel.RM_FAILURE
-                        self.ec.logger.debug("RM critical failure occurred on guid %d." \
-                                " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
-                        break
-                except:
-                    # An error might occure because a RM was deleted abruptly.
-                    # In this case the error should be ignored.
-                    if guid in self.ec._resources:
-                        raise
-
-        return self._failure_level != FailureLevel.OK
+            rm = self.get_resource(guid)
+            state = rm.state
+            critical = rm.get("critical")
+
+            if state == ResourceState.FAILED and critical:
+                self._failure_level = FailureLevel.RM_FAILURE
+                self._abort = True
+                self.ec.logger.debug("RM critical failure occurred on guid %d." \
+                    " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
 
     def set_ec_failure(self):
         self._failure_level = FailureLevel.EC_FAILURE
@@ -257,6 +254,16 @@ class ExperimentController(object):
         """
         return self._fm.abort
 
+    def inform_failure(self, guid):
+        """ Reports a failure in a RM to the EC for evaluation
+
+            :param guid: Resource id
+            :type guid: int
+
+        """
+
+        return self._fm.eval_failure(guid)
+
     def wait_finished(self, guids):
         """ Blocking method that waits until all RMs in the 'guids' list 
         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
@@ -343,21 +350,20 @@ class ExperimentController(object):
                 break
 
             # If a guid reached one of the target states, remove it from list
-            guid = guids[0]
-            rstate = self.state(guid)
+            guid = guids.pop()
+            rm = self.get_resource(guid)
+            rstate = rm.state
             
-            hrrstate = ResourceState2str.get(rstate)
-            hrstate = ResourceState2str.get(state)
-
             if rstate >= state:
-                guids.remove(guid)
-                rm = self.get_resource(guid)
                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
-                    rm.get_rtype(), guid, hrrstate, hrstate))
+                    rm.get_rtype(), guid, rstate, state))
             else:
                 # Debug...
                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
-                    guid, hrrstate, hrstate))
+                    guid, rstate, state))
+
+                guids.append(guid)
+
                 time.sleep(0.5)
   
     def get_task(self, tid):
@@ -798,8 +804,8 @@ class ExperimentController(object):
         if not guids:
             # If no guids list was passed, all 'NEW' RMs will be deployed
             guids = []
-            for guid in self.resources:
-                if self.state(guid) == ResourceState.NEW:
+            for guid, rm in self._resources.iteritems():
+                if rm.state == ResourceState.NEW:
                     guids.append(guid)
                 
         if isinstance(guids, int):
index 36816b5..e82ced0 100644 (file)
@@ -274,7 +274,6 @@ class ResourceManager(Logger):
         """
         return copy.deepcopy(cls._attributes[name])
 
-
     @classmethod
     def get_traces(cls):
         """ Returns a copy of the traces
@@ -598,8 +597,12 @@ class ResourceManager(Logger):
         :rtype: str
         """
         attr = self._attrs[name]
+
+        """
+        A.Q. Commenting due to performance impact
         if attr.has_flag(Flags.Global):
             self.warning( "Attribute %s is global. Use get_global instead." % name)
+        """
             
         return attr.value
 
@@ -877,12 +880,12 @@ class ResourceManager(Logger):
             # Verify all start conditions are met
             for (group, state, time) in start_conditions:
                 # Uncomment for debug
-                unmet = []
-                for guid in group:
-                    rm = self.ec.get_resource(guid)
-                    unmet.append((guid, rm._state))
-                
-                self.debug("---- WAITED STATES ---- %s" % unmet )
+                #unmet = []
+                #for guid in group:
+                #    rm = self.ec.get_resource(guid)
+                #    unmet.append((guid, rm._state))
+                #
+                #self.debug("---- WAITED STATES ---- %s" % unmet )
 
                 reschedule, delay = self._needs_reschedule(group, state, time)
                 if reschedule:
@@ -942,6 +945,7 @@ class ResourceManager(Logger):
         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, 
                 ResourceState.PROVISIONED]:
+            #### XXX: A.Q. IT SHOULD FAIL IF DEPLOY IS CALLED IN OTHER STATES!
             reschedule = True
             self.debug("---- RESCHEDULING DEPLOY ---- state %s " % self.state )
         else:
@@ -1014,6 +1018,7 @@ class ResourceManager(Logger):
 
     def do_fail(self):
         self.set_failed()
+        self.ec.inform_failure(self.guid)
 
     def set_started(self, time = None):
         """ Mark ResourceManager as STARTED """