X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fec.py;h=5f34f8268c0da3fc35bf16d7fc21c782bd586bda;hb=8ee426bdd9f75fc977cb40281f95c6eebc6c44c9;hp=58ece229e25b4fe8076376c46587806c607a2742;hpb=d8144cd833c3a8e82d9580655787b491e768e4f8;p=nepi.git diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 58ece229..5f34f826 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -3,9 +3,8 @@ # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -17,7 +16,8 @@ # # Author: Alina Quereilhac -from nepi.util import guid +from six import next + from nepi.util.parallel import ParallelRun from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat from nepi.execution.resource import ResourceFactory, ResourceAction, \ @@ -41,7 +41,7 @@ import threading import weakref class FailureLevel(object): - """ Describes the system failure state """ + """ Possible failure states for the experiment """ OK = 1 RM_FAILURE = 2 EC_FAILURE = 3 @@ -49,20 +49,20 @@ class FailureLevel(object): class FailureManager(object): """ The FailureManager is responsible for handling errors and deciding whether an experiment should be aborted or not - """ - def __init__(self, ec): - self._ec = weakref.ref(ec) + def __init__(self): + self._ec = None self._failure_level = FailureLevel.OK self._abort = False + def set_ec(self, ec): + self._ec = weakref.ref(ec) + @property def ec(self): """ Returns the ExperimentController associated to this FailureManager - """ - return self._ec() @property @@ -70,6 +70,15 @@ class FailureManager(object): return self._abort def eval_failure(self, guid): + """ Implements failure policy and sets the abort state of the + experiment based on the failure state and criticality of + the RM + + :param guid: Guid of the RM upon which the failure of the experiment + is evaluated + :type guid: int + + """ if self._failure_level == FailureLevel.OK: rm = self.ec.get_resource(guid) state = rm.state @@ -85,7 +94,7 @@ class FailureManager(object): self._failure_level = FailureLevel.EC_FAILURE class ECState(object): - """ Possible states for an ExperimentController + """ Possible states of the ExperimentController """ RUNNING = 1 @@ -93,13 +102,25 @@ class ECState(object): RELEASED = 3 TERMINATED = 4 +# historical note: this class used to be in util/guid.py but is used only here +# FIXME: This class is not thread-safe. Should it be made thread-safe? +class GuidGenerator(object): + def __init__(self): + self._last_guid = 0 + + # historical note: this used to be called `next` + # which confused 2to3 - and me - while it has + # nothing to do at all with the iteration protocol + def generate(self, guid = None): + if guid == None: + guid = self._last_guid + 1 + + self._last_guid = self._last_guid if guid <= self._last_guid else guid + + return guid + class ExperimentController(object): """ - .. class:: Class Args : - - :param exp_id: Human readable identifier for the experiment scenario. - :type exp_id: str - .. note:: An experiment, or scenario, is defined by a concrete set of resources, @@ -162,7 +183,8 @@ class ExperimentController(object): return ec def __init__(self, exp_id = None, local_dir = None, persist = False, - add_node_callback = None, add_edge_callback = None, **kwargs): + fm = None, add_node_callback = None, add_edge_callback = None, + **kwargs): """ ExperimentController entity to model an execute a network experiment. @@ -177,6 +199,10 @@ class ExperimentController(object): completion at local_dir :type persist: bool + :param fm: FailureManager object. If None is given, the default + FailureManager class will be used + :type fm: FailureManager + :param add_node_callback: Callback to invoke for node instantiation when automatic topology creation mode is used :type add_node_callback: function @@ -215,7 +241,7 @@ class ExperimentController(object): self._persist = persist # generator of globally unique ids - self._guid_generator = guid.GuidGenerator() + self._guid_generator = GuidGenerator() # Resource managers self._resources = dict() @@ -232,13 +258,15 @@ class ExperimentController(object): self._groups = dict() # generator of globally unique id for groups - self._group_id_generator = guid.GuidGenerator() + self._group_id_generator = GuidGenerator() # Flag to stop processing thread self._stop = False # Entity in charge of managing system failures - self._fm = FailureManager(self) + if not fm: + self._fm = FailureManager() + self._fm.set_ec(self) # EC state self._state = ECState.RUNNING @@ -267,6 +295,14 @@ class ExperimentController(object): """ return self._logger + @property + def fm(self): + """ Returns the failure manager + + """ + + return self._fm + @property def failure_level(self): """ Returns the level of FAILURE of th experiment @@ -520,7 +556,7 @@ class ExperimentController(object): """ rms = [] - for guid, rm in self._resources.iteritems(): + for guid, rm in self._resources.items(): if rm.get_rtype() == rtype: rms.append(rm) return rms @@ -536,7 +572,7 @@ class ExperimentController(object): :rtype: list """ - keys = self._resources.keys() + keys = list(self._resources.keys()) return keys @@ -550,12 +586,12 @@ class ExperimentController(object): """ rms = [] - for guid, rm in self._resources.iteritems(): + for guid, rm in self._resources.items(): if rm.get_rtype() == rtype: rms.append(rm.guid) return rms - def register_resource(self, rtype, guid = None): + def register_resource(self, rtype, guid = None, **keywords): """ Registers a new ResourceManager of type 'rtype' in the experiment This method will assign a new 'guid' for the RM, if no guid @@ -569,7 +605,8 @@ class ExperimentController(object): """ # Get next available guid - guid = self._guid_generator.next(guid) + # xxx_next_hiccup + guid = self._guid_generator.generate(guid) # Instantiate RM rm = ResourceFactory.create(rtype, self, guid) @@ -577,6 +614,18 @@ class ExperimentController(object): # Store RM self._resources[guid] = rm + ### so we can do something like + # node = ec.register_resource("linux::Node", + # username = user, + # hostname = host) + ### instead of + # node = ec.register_resource("linux::Node") + # ec.set(node, "username", user) + # ec.set(node, "hostname", host) + + for name, value in keywords.items(): + self.set(guid, name, value) + return guid def get_attributes(self, guid): @@ -955,7 +1004,7 @@ class ExperimentController(object): if not guids: # If no guids list was passed, all 'NEW' RMs will be deployed guids = [] - for guid, rm in self._resources.iteritems(): + for guid, rm in self._resources.items(): if rm.state == ResourceState.NEW: guids.append(guid) @@ -967,7 +1016,8 @@ class ExperimentController(object): new_group = False if not group: new_group = True - group = self._group_id_generator.next() + # xxx_next_hiccup + group = self._group_id_generator.generate() if group not in self._groups: self._groups[group] = [] @@ -1035,7 +1085,7 @@ class ExperimentController(object): """ if self._state == ECState.RELEASED: - return + return if isinstance(guids, int): guids = [guids] @@ -1168,7 +1218,7 @@ class ExperimentController(object): try: self._cond.acquire() - task = self._scheduler.next() + task = next(self._scheduler) if not task: # No task to execute. Wait for a new task to be scheduled.