SFA for PL without specifing hostname
[nepi.git] / src / nepi / execution / ec.py
index 1d337e8..ad69893 100644 (file)
@@ -64,20 +64,25 @@ class FailureManager(object):
     def abort(self):
         if self._failure_level == FailureLevel.OK:
             for guid in self.ec.resources:
-                state = self.ec.state(guid)
-                critical = self.ec.get(guid, "critical")
-                if state == ResourceState.FAILED and critical:
-                    self._failure_level = FailureLevel.RM_FAILURE
-                    self.ec.logger.debug("RM critical failure occurred on guid %d." \
-                            " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
-                    break
+                try:
+                    state = self.ec.state(guid)
+                    critical = self.ec.get(guid, "critical")
+                    if state == ResourceState.FAILED and critical:
+                        self._failure_level = FailureLevel.RM_FAILURE
+                        self.ec.logger.debug("RM critical failure occurred on guid %d." \
+                                " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
+                        break
+                except:
+                    # An error might occure because a RM was deleted abruptly.
+                    # In this case the error should be ignored.
+                    if guid in self.ec._resources:
+                        raise
 
         return self._failure_level != FailureLevel.OK
 
     def set_ec_failure(self):
         self._failure_level = FailureLevel.EC_FAILURE
 
-
 class ECState(object):
     """ Possible states for an ExperimentController
    
@@ -196,19 +201,9 @@ class ExperimentController(object):
         # EC state
         self._state = ECState.RUNNING
 
-        # Blacklist file for PL nodes
-        nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
-        plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
-        if not os.path.exists(plblacklist_file):
-            if os.path.isdir(nepi_home):
-                open(plblacklist_file, 'w').close()
-            else:
-                os.makedirs(nepi_home)
-                open(plblacklist_file, 'w').close()
-                    
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
-        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
+        nthreads = int(os.environ.get("NEPI_NTHREADS", "3"))
         self._runner = ParallelRun(maxthreads = nthreads)
 
         # Event processing thread
@@ -216,7 +211,7 @@ class ExperimentController(object):
         self._thread = threading.Thread(target = self._process)
         self._thread.setDaemon(True)
         self._thread.start()
-
+        
     @property
     def logger(self):
         """ Returns the logger instance of the Experiment Controller
@@ -327,7 +322,6 @@ class ExperimentController(object):
             :type guids: list
         
         """
-        
         if isinstance(guids, int):
             guids = [guids]
 
@@ -378,7 +372,11 @@ class ExperimentController(object):
             :rtype: ResourceManager
             
         """
-        return self._resources.get(guid)
+        rm = self._resources.get(guid)
+        return rm
+
+    def remove_resource(self, guid):
+        del self._resources[guid]
 
     @property
     def resources(self):
@@ -388,7 +386,9 @@ class ExperimentController(object):
             :rtype: set
 
         """
-        return self._resources.keys()
+        keys = self._resources.keys()
+
+        return keys
 
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
@@ -869,20 +869,21 @@ class ExperimentController(object):
             :type guids: list
 
         """
+        if isinstance(guids, int):
+            guids = [guids]
+
         if not guids:
             guids = self.resources
 
-        # Remove all pending tasks from the scheduler queue
-        for tid in list(self._scheduler.pending):
-            self._scheduler.remove(tid)
-
-        self._runner.empty()
-
         for guid in guids:
             rm = self.get_resource(guid)
             self.schedule("0s", rm.release)
 
         self.wait_released(guids)
+
+        for guid in guids:
+            if self.get(guid, "hardRelease"):
+                self.remove_resource(guid)
         
     def shutdown(self):
         """ Releases all resources and stops the ExperimentController
@@ -892,6 +893,13 @@ class ExperimentController(object):
         if self._state == ECState.FAILED:
             raise RuntimeError("EC failure. Can not exit gracefully")
 
+        # Remove all pending tasks from the scheduler queue
+        for tid in list(self._scheduler.pending):
+            self._scheduler.remove(tid)
+
+        # Remove pending tasks from the workers queue
+        self._runner.empty()
+
         self.release()
 
         # Mark the EC state as TERMINATED