X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fec.py;h=ad698932da849c91ae528564c1979de11f1743af;hb=c01d40579e6d77f12327072dec0fb82c41676bb0;hp=7d8b990bfc8f7ae3ee49f69a1371ce67f0fc9420;hpb=68adac66099b08e3daae7a84b29af0f7c69ee955;p=nepi.git diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 7d8b990b..ad698932 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -64,20 +64,25 @@ class FailureManager(object): def abort(self): if self._failure_level == FailureLevel.OK: for guid in self.ec.resources: - state = self.ec.state(guid) - critical = self.ec.get(guid, "critical") - if state == ResourceState.FAILED and critical: - self._failure_level = FailureLevel.RM_FAILURE - self.ec.logger.debug("RM critical failure occurred on guid %d." \ - " Setting EC FAILURE LEVEL to RM_FAILURE" % guid) - break + try: + state = self.ec.state(guid) + critical = self.ec.get(guid, "critical") + if state == ResourceState.FAILED and critical: + self._failure_level = FailureLevel.RM_FAILURE + self.ec.logger.debug("RM critical failure occurred on guid %d." \ + " Setting EC FAILURE LEVEL to RM_FAILURE" % guid) + break + except: + # An error might occure because a RM was deleted abruptly. + # In this case the error should be ignored. + if guid in self.ec._resources: + raise return self._failure_level != FailureLevel.OK def set_ec_failure(self): self._failure_level = FailureLevel.EC_FAILURE - class ECState(object): """ Possible states for an ExperimentController @@ -198,7 +203,7 @@ class ExperimentController(object): # The runner is a pool of threads used to parallelize # execution of tasks - nthreads = int(os.environ.get("NEPI_NTHREADS", "50")) + nthreads = int(os.environ.get("NEPI_NTHREADS", "3")) self._runner = ParallelRun(maxthreads = nthreads) # Event processing thread @@ -206,7 +211,7 @@ class ExperimentController(object): self._thread = threading.Thread(target = self._process) self._thread.setDaemon(True) self._thread.start() - + @property def logger(self): """ Returns the logger instance of the Experiment Controller @@ -317,7 +322,6 @@ class ExperimentController(object): :type guids: list """ - if isinstance(guids, int): guids = [guids] @@ -368,7 +372,11 @@ class ExperimentController(object): :rtype: ResourceManager """ - return self._resources.get(guid) + rm = self._resources.get(guid) + return rm + + def remove_resource(self, guid): + del self._resources[guid] @property def resources(self): @@ -378,7 +386,9 @@ class ExperimentController(object): :rtype: set """ - return self._resources.keys() + keys = self._resources.keys() + + return keys def register_resource(self, rtype, guid = None): """ Registers a new ResourceManager of type 'rtype' in the experiment @@ -607,7 +617,39 @@ class ExperimentController(object): """ rm = self.get_resource(guid) - return rm.set(name, value) + rm.set(name, value) + + def get_global(self, rtype, name): + """ Returns the value of the global attribute with name 'name' on the + RMs of rtype 'rtype'. + + :param guid: Guid of the RM + :type guid: int + + :param name: Name of the attribute + :type name: str + + :return: The value of the attribute with name 'name' + + """ + rclass = ResourceFactory.get_resource_type(rtype) + return rclass.get_global(name) + + def set_global(self, rtype, name, value): + """ Modifies the value of the global attribute with name 'name' on the + RMs of with rtype 'rtype'. + + :param guid: Guid of the RM + :type guid: int + + :param name: Name of the attribute + :type name: str + + :param value: Value of the attribute + + """ + rclass = ResourceFactory.get_resource_type(rtype) + return rclass.set_global(name, value) def state(self, guid, hr = False): """ Returns the state of a resource @@ -654,6 +696,41 @@ class ExperimentController(object): rm = self.get_resource(guid) return rm.start() + def get_start_time(self, guid): + """ Returns the start time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.start_time + + def get_stop_time(self, guid): + """ Returns the stop time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.stop_time + + def get_discover_time(self, guid): + """ Returns the discover time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.discover_time + + def get_provision_time(self, guid): + """ Returns the provision time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.provision_time + + def get_ready_time(self, guid): + """ Returns the deployment time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.ready_time + + def get_release_time(self, guid): + """ Returns the release time of the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.release_time + + def get_failed_time(self, guid): + """ Returns the time failure occured for the RM as a timestamp """ + rm = self.get_resource(guid) + return rm.failed_time + def set_with_conditions(self, name, value, guids1, guids2, state, time = None): """ Modifies the value of attribute with name 'name' on all RMs @@ -754,6 +831,11 @@ class ExperimentController(object): rm = self.get_resource(guid) self.schedule("0s", rm.start_with_conditions) + if rm.conditions.get(ResourceAction.STOP): + # Only if the RM has STOP conditions we + # schedule a stop. Otherwise the RM will stop immediately + self.schedule("0s", rm.stop_with_conditions) + if wait_all_ready and new_group: # Schedule a function to check that all resources are # READY, and only then schedule the start. @@ -772,10 +854,10 @@ class ExperimentController(object): if not wait_all_ready: self.schedule("0s", rm.start_with_conditions) - if rm.conditions.get(ResourceAction.STOP): - # Only if the RM has STOP conditions we - # schedule a stop. Otherwise the RM will stop immediately - self.schedule("0s", rm.stop_with_conditions) + if rm.conditions.get(ResourceAction.STOP): + # Only if the RM has STOP conditions we + # schedule a stop. Otherwise the RM will stop immediately + self.schedule("0s", rm.stop_with_conditions) def release(self, guids = None): """ Releases all ResourceManagers in the guids list. @@ -787,20 +869,21 @@ class ExperimentController(object): :type guids: list """ + if isinstance(guids, int): + guids = [guids] + if not guids: guids = self.resources - # Remove all pending tasks from the scheduler queue - for tid in list(self._scheduler.pending): - self._scheduler.remove(tid) - - self._runner.empty() - for guid in guids: rm = self.get_resource(guid) self.schedule("0s", rm.release) self.wait_released(guids) + + for guid in guids: + if self.get(guid, "hardRelease"): + self.remove_resource(guid) def shutdown(self): """ Releases all resources and stops the ExperimentController @@ -810,6 +893,13 @@ class ExperimentController(object): if self._state == ECState.FAILED: raise RuntimeError("EC failure. Can not exit gracefully") + # Remove all pending tasks from the scheduler queue + for tid in list(self._scheduler.pending): + self._scheduler.remove(tid) + + # Remove pending tasks from the workers queue + self._runner.empty() + self.release() # Mark the EC state as TERMINATED @@ -951,11 +1041,10 @@ class ExperimentController(object): :type task: Task """ - # Invoke callback - task.status = TaskStatus.DONE - try: + # Invoke callback task.result = task.callback() + task.status = TaskStatus.DONE except: import traceback err = traceback.format_exc()