patching concurrency issues in the EC
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 4 May 2014 16:29:56 +0000 (18:29 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sun, 4 May 2014 16:29:56 +0000 (18:29 +0200)
src/nepi/execution/ec.py

index 776881c..5584c68 100644 (file)
@@ -64,20 +64,25 @@ class FailureManager(object):
     def abort(self):
         if self._failure_level == FailureLevel.OK:
             for guid in self.ec.resources:
-                state = self.ec.state(guid)
-                critical = self.ec.get(guid, "critical")
-                if state == ResourceState.FAILED and critical:
-                    self._failure_level = FailureLevel.RM_FAILURE
-                    self.ec.logger.debug("RM critical failure occurred on guid %d." \
-                            " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
-                    break
+                try:
+                    state = self.ec.state(guid)
+                    critical = self.ec.get(guid, "critical")
+                    if state == ResourceState.FAILED and critical:
+                        self._failure_level = FailureLevel.RM_FAILURE
+                        self.ec.logger.debug("RM critical failure occurred on guid %d." \
+                                " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
+                        break
+                except:
+                    # An error might occure because a RM was deleted abruptly.
+                    # In this case the error should be ignored.
+                    if guid in self.ec._resources:
+                        raise
 
         return self._failure_level != FailureLevel.OK
 
     def set_ec_failure(self):
         self._failure_level = FailureLevel.EC_FAILURE
 
-
 class ECState(object):
     """ Possible states for an ExperimentController
    
@@ -216,7 +221,7 @@ class ExperimentController(object):
         self._thread = threading.Thread(target = self._process)
         self._thread.setDaemon(True)
         self._thread.start()
-
+        
     @property
     def logger(self):
         """ Returns the logger instance of the Experiment Controller
@@ -378,7 +383,11 @@ class ExperimentController(object):
             :rtype: ResourceManager
             
         """
-        return self._resources.get(guid)
+        rm = self._resources.get(guid)
+        return rm
+
+    def remove_resource(self, guid):
+        del self._resources[guid]
 
     @property
     def resources(self):
@@ -388,7 +397,9 @@ class ExperimentController(object):
             :rtype: set
 
         """
-        return self._resources.keys()
+        keys = self._resources.keys()
+
+        return keys
 
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
@@ -883,7 +894,7 @@ class ExperimentController(object):
 
         for guid in guids:
             if self.get(guid, "hardRelease"):
-                del self._resources[guid]
+                self.remove_resource(guid)
         
     def shutdown(self):
         """ Releases all resources and stops the ExperimentController