def abort(self):
if self._failure_level == FailureLevel.OK:
for guid in self.ec.resources:
- state = self.ec.state(guid)
- critical = self.ec.get(guid, "critical")
- if state == ResourceState.FAILED and critical:
- self._failure_level = FailureLevel.RM_FAILURE
- self.ec.logger.debug("RM critical failure occurred on guid %d." \
- " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
- break
+ try:
+ state = self.ec.state(guid)
+ critical = self.ec.get(guid, "critical")
+ if state == ResourceState.FAILED and critical:
+ self._failure_level = FailureLevel.RM_FAILURE
+ self.ec.logger.debug("RM critical failure occurred on guid %d." \
+ " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
+ break
+ except:
+ # An error might occure because a RM was deleted abruptly.
+ # In this case the error should be ignored.
+ if guid in self.ec._resources:
+ raise
return self._failure_level != FailureLevel.OK
def set_ec_failure(self):
self._failure_level = FailureLevel.EC_FAILURE
-
class ECState(object):
""" Possible states for an ExperimentController
# EC state
self._state = ECState.RUNNING
- # Blacklist file for PL nodes
- nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
- plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
- if not os.path.exists(plblacklist_file):
- if os.path.isdir(nepi_home):
- open(plblacklist_file, 'w').close()
- else:
- os.makedirs(nepi_home)
- open(plblacklist_file, 'w').close()
-
# The runner is a pool of threads used to parallelize
# execution of tasks
- nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
+ nthreads = int(os.environ.get("NEPI_NTHREADS", "3"))
self._runner = ParallelRun(maxthreads = nthreads)
# Event processing thread
self._thread = threading.Thread(target = self._process)
self._thread.setDaemon(True)
self._thread.start()
-
+
@property
def logger(self):
""" Returns the logger instance of the Experiment Controller
:type guids: list
"""
-
if isinstance(guids, int):
guids = [guids]
:rtype: ResourceManager
"""
- return self._resources.get(guid)
+ rm = self._resources.get(guid)
+ return rm
+
+ def remove_resource(self, guid):
+ del self._resources[guid]
@property
def resources(self):
:rtype: set
"""
- return self._resources.keys()
+ keys = self._resources.keys()
+
+ return keys
def register_resource(self, rtype, guid = None):
""" Registers a new ResourceManager of type 'rtype' in the experiment
:type guids: list
"""
+ if isinstance(guids, int):
+ guids = [guids]
+
if not guids:
guids = self.resources
- # Remove all pending tasks from the scheduler queue
- for tid in list(self._scheduler.pending):
- self._scheduler.remove(tid)
-
- self._runner.empty()
-
for guid in guids:
rm = self.get_resource(guid)
self.schedule("0s", rm.release)
self.wait_released(guids)
+
+ for guid in guids:
+ if self.get(guid, "hardRelease"):
+ self.remove_resource(guid)
def shutdown(self):
""" Releases all resources and stops the ExperimentController
if self._state == ECState.FAILED:
raise RuntimeError("EC failure. Can not exit gracefully")
+ # Remove all pending tasks from the scheduler queue
+ for tid in list(self._scheduler.pending):
+ self._scheduler.remove(tid)
+
+ # Remove pending tasks from the workers queue
+ self._runner.empty()
+
self.release()
# Mark the EC state as TERMINATED