register_resource accepts a keyword argument, so that subsequent calls to ec.set...
[nepi.git] / src / nepi / execution / ec.py
index 58ece22..5f34f82 100644 (file)
@@ -3,9 +3,8 @@
 #    Copyright (C) 2013 INRIA
 #
 #    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU General Public License as published by
-#    the Free Software Foundation, either version 3 of the License, or
-#    (at your option) any later version.
+#    it under the terms of the GNU General Public License version 2 as
+#    published by the Free Software Foundation;
 #
 #    This program is distributed in the hope that it will be useful,
 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -17,7 +16,8 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.util import guid
+from six import next
+
 from nepi.util.parallel import ParallelRun
 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
 from nepi.execution.resource import ResourceFactory, ResourceAction, \
@@ -41,7 +41,7 @@ import threading
 import weakref
 
 class FailureLevel(object):
-    """ Describes the system failure state """
+    """ Possible failure states for the experiment """
     OK = 1
     RM_FAILURE = 2
     EC_FAILURE = 3
@@ -49,20 +49,20 @@ class FailureLevel(object):
 class FailureManager(object):
     """ The FailureManager is responsible for handling errors
     and deciding whether an experiment should be aborted or not
-
     """
 
-    def __init__(self, ec):
-        self._ec = weakref.ref(ec)
+    def __init__(self):
+        self._ec = None
         self._failure_level = FailureLevel.OK
         self._abort = False
 
+    def set_ec(self, ec):
+        self._ec = weakref.ref(ec)
+
     @property
     def ec(self):
         """ Returns the ExperimentController associated to this FailureManager 
-        
         """
-        
         return self._ec()
 
     @property
@@ -70,6 +70,15 @@ class FailureManager(object):
         return self._abort
 
     def eval_failure(self, guid):
+        """ Implements failure policy and sets the abort state of the
+        experiment based on the failure state and criticality of
+        the RM
+
+        :param guid: Guid of the RM upon which the failure of the experiment
+            is evaluated
+        :type guid: int
+
+        """
         if self._failure_level == FailureLevel.OK:
             rm = self.ec.get_resource(guid)
             state = rm.state
@@ -85,7 +94,7 @@ class FailureManager(object):
         self._failure_level = FailureLevel.EC_FAILURE
 
 class ECState(object):
-    """ Possible states for an ExperimentController
+    """ Possible states of the ExperimentController
    
     """
     RUNNING = 1
@@ -93,13 +102,25 @@ class ECState(object):
     RELEASED = 3
     TERMINATED = 4
 
+# historical note: this class used to be in util/guid.py but is used only here
+# FIXME: This class is not thread-safe. Should it be made thread-safe?
+class GuidGenerator(object):
+    def __init__(self):
+        self._last_guid = 0
+
+    # historical note: this used to be called `next`
+    # which confused 2to3 - and me - while it has
+    # nothing to do at all with the iteration protocol
+    def generate(self, guid = None):
+        if guid == None:
+            guid = self._last_guid + 1
+
+        self._last_guid = self._last_guid if guid <= self._last_guid else guid
+
+        return guid
+
 class ExperimentController(object):
     """
-    .. class:: Class Args :
-      
-        :param exp_id: Human readable identifier for the experiment scenario. 
-        :type exp_id: str
-
     .. note::
 
     An experiment, or scenario, is defined by a concrete set of resources,
@@ -162,7 +183,8 @@ class ExperimentController(object):
         return ec
 
     def __init__(self, exp_id = None, local_dir = None, persist = False,
-            add_node_callback = None, add_edge_callback = None, **kwargs):
+            fm = None, add_node_callback = None, add_edge_callback = None, 
+            **kwargs):
         """ ExperimentController entity to model an execute a network 
         experiment.
         
@@ -177,6 +199,10 @@ class ExperimentController(object):
         completion at local_dir
         :type persist: bool
 
+        :param fm: FailureManager object. If None is given, the default 
+            FailureManager class will be used
+        :type fm: FailureManager
+
         :param add_node_callback: Callback to invoke for node instantiation
         when automatic topology creation mode is used 
         :type add_node_callback: function
@@ -215,7 +241,7 @@ class ExperimentController(object):
         self._persist = persist
 
         # generator of globally unique ids
-        self._guid_generator = guid.GuidGenerator()
+        self._guid_generator = GuidGenerator()
         
         # Resource managers
         self._resources = dict()
@@ -232,13 +258,15 @@ class ExperimentController(object):
         self._groups = dict()
 
         # generator of globally unique id for groups
-        self._group_id_generator = guid.GuidGenerator()
+        self._group_id_generator = GuidGenerator()
 
         # Flag to stop processing thread
         self._stop = False
     
         # Entity in charge of managing system failures
-        self._fm = FailureManager(self)
+        if not fm:
+            self._fm = FailureManager()
+        self._fm.set_ec(self)
 
         # EC state
         self._state = ECState.RUNNING
@@ -267,6 +295,14 @@ class ExperimentController(object):
         """
         return self._logger
 
+    @property
+    def fm(self):
+        """ Returns the failure manager
+
+        """
+
+        return self._fm
+
     @property
     def failure_level(self):
         """ Returns the level of FAILURE of th experiment
@@ -520,7 +556,7 @@ class ExperimentController(object):
             
         """
         rms = []
-        for guid, rm in self._resources.iteritems():
+        for guid, rm in self._resources.items():
             if rm.get_rtype() == rtype: 
                 rms.append(rm)
         return rms
@@ -536,7 +572,7 @@ class ExperimentController(object):
             :rtype: list
 
         """
-        keys = self._resources.keys()
+        keys = list(self._resources.keys())
 
         return keys
 
@@ -550,12 +586,12 @@ class ExperimentController(object):
             
         """
         rms = []
-        for guid, rm in self._resources.iteritems():
+        for guid, rm in self._resources.items():
             if rm.get_rtype() == rtype: 
                 rms.append(rm.guid)
         return rms
 
-    def register_resource(self, rtype, guid = None):
+    def register_resource(self, rtype, guid = None, **keywords):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
         
         This method will assign a new 'guid' for the RM, if no guid
@@ -569,7 +605,8 @@ class ExperimentController(object):
             
         """
         # Get next available guid
-        guid = self._guid_generator.next(guid)
+        # xxx_next_hiccup
+        guid = self._guid_generator.generate(guid)
         
         # Instantiate RM
         rm = ResourceFactory.create(rtype, self, guid)
@@ -577,6 +614,18 @@ class ExperimentController(object):
         # Store RM
         self._resources[guid] = rm
 
+        ### so we can do something like
+        # node = ec.register_resource("linux::Node",
+        #                             username = user,
+        #                             hostname = host)
+        ### instead of
+        # node = ec.register_resource("linux::Node")
+        # ec.set(node, "username", user)
+        # ec.set(node, "hostname", host)
+
+        for name, value in keywords.items():
+            self.set(guid, name, value)
+
         return guid
 
     def get_attributes(self, guid):
@@ -955,7 +1004,7 @@ class ExperimentController(object):
         if not guids:
             # If no guids list was passed, all 'NEW' RMs will be deployed
             guids = []
-            for guid, rm in self._resources.iteritems():
+            for guid, rm in self._resources.items():
                 if rm.state == ResourceState.NEW:
                     guids.append(guid)
                 
@@ -967,7 +1016,8 @@ class ExperimentController(object):
         new_group = False
         if not group:
             new_group = True
-            group = self._group_id_generator.next()
+            # xxx_next_hiccup
+            group = self._group_id_generator.generate()
 
         if group not in self._groups:
             self._groups[group] = []
@@ -1035,7 +1085,7 @@ class ExperimentController(object):
 
         """
         if self._state == ECState.RELEASED:
-           return 
+            return 
 
         if isinstance(guids, int):
             guids = [guids]
@@ -1168,7 +1218,7 @@ class ExperimentController(object):
             try:
                 self._cond.acquire()
 
-                task = self._scheduler.next()
+                task = next(self._scheduler)
                 
                 if not task:
                     # No task to execute. Wait for a new task to be scheduled.