ResourceState, ResourceState2str
from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
from nepi.execution.trace import TraceAttr
+from nepi.util.serializer import ECSerializer, SFormats
# TODO: use multiprocessing instead of threading
# TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
def __init__(self, ec):
self._ec = weakref.ref(ec)
self._failure_level = FailureLevel.OK
+ self._abort = False
@property
def ec(self):
@property
def abort(self):
+ return self._abort
+
+ def eval_failure(self, guid):
if self._failure_level == FailureLevel.OK:
- for guid in self.ec.resources:
- state = self.ec.state(guid)
- critical = self.ec.get(guid, "critical")
- if state == ResourceState.FAILED and critical:
- self._failure_level = FailureLevel.RM_FAILURE
- self.ec.logger.debug("RM critical failure occurred on guid %d." \
- " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
- break
+ rm = self.ec.get_resource(guid)
+ state = rm.state
+ critical = rm.get("critical")
- return self._failure_level != FailureLevel.OK
+ if state == ResourceState.FAILED and critical:
+ self._failure_level = FailureLevel.RM_FAILURE
+ self._abort = True
+ self.ec.logger.debug("RM critical failure occurred on guid %d." \
+ " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
def set_ec_failure(self):
self._failure_level = FailureLevel.EC_FAILURE
-
class ECState(object):
""" Possible states for an ExperimentController
"""
+ @classmethod
+ def load(cls, path, format = SFormats.XML):
+ serializer = ECSerializer()
+ ec = serializer.load(path)
+ return ec
+
def __init__(self, exp_id = None):
super(ExperimentController, self).__init__()
# The runner is a pool of threads used to parallelize
# execution of tasks
- nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
- self._runner = ParallelRun(maxthreads = nthreads)
+ self._nthreads = 20
+ self._runner = None
# Event processing thread
self._cond = threading.Condition()
self._thread = threading.Thread(target = self._process)
self._thread.setDaemon(True)
self._thread.start()
-
+
@property
def logger(self):
""" Returns the logger instance of the Experiment Controller
"""
return self._logger
+ @property
+ def failure_level(self):
+ """ Returns the level of FAILURE of th experiment
+
+ """
+
+ return self._fm._failure_level
+
@property
def ecstate(self):
""" Returns the state of the Experiment Controller
"""
return self._run_id
+ @property
+ def nthreads(self):
+ """ Returns the number of processing nthreads used
+
+ """
+ return self._nthreads
+
+
@property
def abort(self):
""" Returns True if the experiment has failed and should be interrupted,
"""
return self._fm.abort
+ def inform_failure(self, guid):
+ """ Reports a failure in a RM to the EC for evaluation
+
+ :param guid: Resource id
+ :type guid: int
+
+ """
+
+ return self._fm.eval_failure(guid)
+
def wait_finished(self, guids):
""" Blocking method that waits until all RMs in the 'guids' list
have reached a state >= STOPPED (i.e. STOPPED, FAILED or
:type guids: list
"""
-
if isinstance(guids, int):
guids = [guids]
break
# If a guid reached one of the target states, remove it from list
- guid = guids[0]
- rstate = self.state(guid)
+ guid = guids.pop()
+ rm = self.get_resource(guid)
+ rstate = rm.state
- hrrstate = ResourceState2str.get(rstate)
- hrstate = ResourceState2str.get(state)
-
if rstate >= state:
- guids.remove(guid)
- rm = self.get_resource(guid)
self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
- rm.get_rtype(), guid, hrrstate, hrstate))
+ rm.get_rtype(), guid, rstate, state))
else:
# Debug...
self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
- guid, hrrstate, hrstate))
+ guid, rstate, state))
+
+ guids.append(guid)
+
time.sleep(0.5)
-
+
+ def serialize(self, format = SFormats.XML):
+ serializer = ECSerializer()
+ sec = serializer.load(self, format = format)
+ return sec
+
+ def save(self, path, format = SFormats.XML):
+ serializer = ECSerializer()
+ path = serializer.save(self, path, format = format)
+ return path
+
def get_task(self, tid):
""" Returns a task by its id
def get_resource(self, guid):
""" Returns a registered ResourceManager by its guid
- :param guid: Id of the task
+ :param guid: Id of the resource
:type guid: int
:rtype: ResourceManager
"""
- return self._resources.get(guid)
+ rm = self._resources.get(guid)
+ return rm
+
+ def get_resources_by_type(self, rtype):
+ """ Returns a registered ResourceManager by its guid
+
+ :param rtype: Resource type
+ :type rtype: string
+
+ :rtype: list of ResourceManagers
+
+ """
+ rms = []
+ for guid, rm in self._resources.iteritems():
+ if rm.get_rtype() == type:
+ rms.append(rm)
+ return rms
+
+ def remove_resource(self, guid):
+ del self._resources[guid]
@property
def resources(self):
:rtype: set
"""
- return self._resources.keys()
+ keys = self._resources.keys()
+
+ return keys
def register_resource(self, rtype, guid = None):
""" Registers a new ResourceManager of type 'rtype' in the experiment
"""
rm = self.get_resource(guid)
- return rm.set(name, value)
+ rm.set(name, value)
+
+ def get_global(self, rtype, name):
+ """ Returns the value of the global attribute with name 'name' on the
+ RMs of rtype 'rtype'.
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :param name: Name of the attribute
+ :type name: str
+
+ :return: The value of the attribute with name 'name'
+
+ """
+ rclass = ResourceFactory.get_resource_type(rtype)
+ return rclass.get_global(name)
+
+ def set_global(self, rtype, name, value):
+ """ Modifies the value of the global attribute with name 'name' on the
+ RMs of with rtype 'rtype'.
+
+ :param guid: Guid of the RM
+ :type guid: int
+
+ :param name: Name of the attribute
+ :type name: str
+
+ :param value: Value of the attribute
+
+ """
+ rclass = ResourceFactory.get_resource_type(rtype)
+ return rclass.set_global(name, value)
def state(self, guid, hr = False):
""" Returns the state of a resource
rm = self.get_resource(guid)
return rm.start()
+ def get_start_time(self, guid):
+ """ Returns the start time of the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.start_time
+
+ def get_stop_time(self, guid):
+ """ Returns the stop time of the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.stop_time
+
+ def get_discover_time(self, guid):
+ """ Returns the discover time of the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.discover_time
+
+ def get_provision_time(self, guid):
+ """ Returns the provision time of the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.provision_time
+
+ def get_ready_time(self, guid):
+ """ Returns the deployment time of the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.ready_time
+
+ def get_release_time(self, guid):
+ """ Returns the release time of the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.release_time
+
+ def get_failed_time(self, guid):
+ """ Returns the time failure occured for the RM as a timestamp """
+ rm = self.get_resource(guid)
+ return rm.failed_time
+
def set_with_conditions(self, name, value, guids1, guids2, state,
time = None):
""" Modifies the value of attribute with name 'name' on all RMs
if not guids:
# If no guids list was passed, all 'NEW' RMs will be deployed
guids = []
- for guid in self.resources:
- if self.state(guid) == ResourceState.NEW:
+ for guid, rm in self._resources.iteritems():
+ if rm.state == ResourceState.NEW:
guids.append(guid)
if isinstance(guids, int):
:type guids: list
"""
+ if isinstance(guids, int):
+ guids = [guids]
+
if not guids:
guids = self.resources
- # Remove all pending tasks from the scheduler queue
- for tid in list(self._scheduler.pending):
- self._scheduler.remove(tid)
-
- self._runner.empty()
-
for guid in guids:
rm = self.get_resource(guid)
self.schedule("0s", rm.release)
self.wait_released(guids)
+
+ for guid in guids:
+ if self.get(guid, "hardRelease"):
+ self.remove_resource(guid)
def shutdown(self):
""" Releases all resources and stops the ExperimentController
if self._state == ECState.FAILED:
raise RuntimeError("EC failure. Can not exit gracefully")
+ # Remove all pending tasks from the scheduler queue
+ for tid in list(self._scheduler.pending):
+ self._scheduler.remove(tid)
+
+ # Remove pending tasks from the workers queue
+ self._runner.empty()
+
self.release()
# Mark the EC state as TERMINATED
"""
+ self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
+ self._runner = ParallelRun(maxthreads = self.nthreads)
self._runner.start()
while not self._stop:
:type task: Task
"""
- # Invoke callback
- task.status = TaskStatus.DONE
-
try:
+ # Invoke callback
task.result = task.callback()
+ task.status = TaskStatus.DONE
except:
import traceback
err = traceback.format_exc()