util/guid.py merged into ec.py that is the only one using it
[nepi.git] / src / nepi / execution / ec.py
index 1357e4f..f282ac6 100644 (file)
@@ -3,9 +3,8 @@
 #    Copyright (C) 2013 INRIA
 #
 #    This program is free software: you can redistribute it and/or modify
 #    Copyright (C) 2013 INRIA
 #
 #    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU General Public License as published by
-#    the Free Software Foundation, either version 3 of the License, or
-#    (at your option) any later version.
+#    it under the terms of the GNU General Public License version 2 as
+#    published by the Free Software Foundation;
 #
 #    This program is distributed in the hope that it will be useful,
 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
 #
 #    This program is distributed in the hope that it will be useful,
 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -17,7 +16,8 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.util import guid
+from six import next
+
 from nepi.util.parallel import ParallelRun
 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
 from nepi.execution.resource import ResourceFactory, ResourceAction, \
 from nepi.util.parallel import ParallelRun
 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
 from nepi.execution.resource import ResourceFactory, ResourceAction, \
@@ -26,6 +26,7 @@ from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 from nepi.util.serializer import ECSerializer, SFormats
 from nepi.util.plotter import ECPlotter, PFormats
 from nepi.execution.trace import TraceAttr
 from nepi.util.serializer import ECSerializer, SFormats
 from nepi.util.plotter import ECPlotter, PFormats
+from nepi.util.netgraph import NetGraph, TopologyType 
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
@@ -40,7 +41,7 @@ import threading
 import weakref
 
 class FailureLevel(object):
 import weakref
 
 class FailureLevel(object):
-    """ Describes the system failure state """
+    """ Possible failure states for the experiment """
     OK = 1
     RM_FAILURE = 2
     EC_FAILURE = 3
     OK = 1
     RM_FAILURE = 2
     EC_FAILURE = 3
@@ -48,20 +49,20 @@ class FailureLevel(object):
 class FailureManager(object):
     """ The FailureManager is responsible for handling errors
     and deciding whether an experiment should be aborted or not
 class FailureManager(object):
     """ The FailureManager is responsible for handling errors
     and deciding whether an experiment should be aborted or not
-
     """
 
     """
 
-    def __init__(self, ec):
-        self._ec = weakref.ref(ec)
+    def __init__(self):
+        self._ec = None
         self._failure_level = FailureLevel.OK
         self._abort = False
 
         self._failure_level = FailureLevel.OK
         self._abort = False
 
+    def set_ec(self, ec):
+        self._ec = weakref.ref(ec)
+
     @property
     def ec(self):
         """ Returns the ExperimentController associated to this FailureManager 
     @property
     def ec(self):
         """ Returns the ExperimentController associated to this FailureManager 
-        
         """
         """
-        
         return self._ec()
 
     @property
         return self._ec()
 
     @property
@@ -69,6 +70,15 @@ class FailureManager(object):
         return self._abort
 
     def eval_failure(self, guid):
         return self._abort
 
     def eval_failure(self, guid):
+        """ Implements failure policy and sets the abort state of the
+        experiment based on the failure state and criticality of
+        the RM
+
+        :param guid: Guid of the RM upon which the failure of the experiment
+            is evaluated
+        :type guid: int
+
+        """
         if self._failure_level == FailureLevel.OK:
             rm = self.ec.get_resource(guid)
             state = rm.state
         if self._failure_level == FailureLevel.OK:
             rm = self.ec.get_resource(guid)
             state = rm.state
@@ -84,24 +94,37 @@ class FailureManager(object):
         self._failure_level = FailureLevel.EC_FAILURE
 
 class ECState(object):
         self._failure_level = FailureLevel.EC_FAILURE
 
 class ECState(object):
-    """ Possible states for an ExperimentController
+    """ Possible states of the ExperimentController
    
     """
     RUNNING = 1
     FAILED = 2
    
     """
     RUNNING = 1
     FAILED = 2
-    TERMINATED = 3
+    RELEASED = 3
+    TERMINATED = 4
+
+# historical note: this class used to be in util/guid.py but is used only here
+# FIXME: This class is not thread-safe. Should it be made thread-safe?
+class GuidGenerator(object):
+    def __init__(self):
+        self._last_guid = 0
+
+    # historical note: this used to be called `next`
+    # which confused 2to3 - and me - while it has
+    # nothing to do at all with the iteration protocol
+    def generate(self, guid = None):
+        if guid == None:
+            guid = self._last_guid + 1
+
+        self._last_guid = self._last_guid if guid <= self._last_guid else guid
+
+        return guid
 
 class ExperimentController(object):
     """
 
 class ExperimentController(object):
     """
-    .. class:: Class Args :
-      
-        :param exp_id: Human readable identifier for the experiment scenario. 
-        :type exp_id: str
-
     .. note::
 
     An experiment, or scenario, is defined by a concrete set of resources,
     .. note::
 
     An experiment, or scenario, is defined by a concrete set of resources,
-    behavior, configuration and interconnection of those resources. 
+    and the behavior, configuration and interconnection of those resources. 
     The Experiment Description (ED) is a detailed representation of a
     single experiment. It contains all the necessary information to 
     allow repeating the experiment. NEPI allows to describe
     The Experiment Description (ED) is a detailed representation of a
     single experiment. It contains all the necessary information to 
     allow repeating the experiment. NEPI allows to describe
@@ -116,7 +139,7 @@ class ExperimentController(object):
     recreated (and re-run) by instantiating an EC and recreating 
     the same experiment description. 
 
     recreated (and re-run) by instantiating an EC and recreating 
     the same experiment description. 
 
-    In NEPI, an experiment is represented as a graph of interconnected
+    An experiment is represented as a graph of interconnected
     resources. A resource is a generic concept in the sense that any
     component taking part of an experiment, whether physical of
     virtual, is considered a resource. A resources could be a host, 
     resources. A resource is a generic concept in the sense that any
     component taking part of an experiment, whether physical of
     virtual, is considered a resource. A resources could be a host, 
@@ -126,10 +149,9 @@ class ExperimentController(object):
     single resource. ResourceManagers are specific to a resource
     type (i.e. An RM to control a Linux application will not be
     the same as the RM used to control a ns-3 simulation).
     single resource. ResourceManagers are specific to a resource
     type (i.e. An RM to control a Linux application will not be
     the same as the RM used to control a ns-3 simulation).
-    To support a new type of resource in NEPI, a new RM must be 
-    implemented. NEPI already provides a variety of
-    RMs to control basic resources, and new can be extended from
-    the existing ones.
+    To support a new type of resource, a new RM must be implemented. 
+    NEPI already provides a variety of RMs to control basic resources, 
+    and new can be extended from the existing ones.
 
     Through the EC interface the user can create ResourceManagers (RMs),
     configure them and interconnect them, to describe an experiment.
 
     Through the EC interface the user can create ResourceManagers (RMs),
     configure them and interconnect them, to describe an experiment.
@@ -160,22 +182,36 @@ class ExperimentController(object):
         ec = serializer.load(filepath)
         return ec
 
         ec = serializer.load(filepath)
         return ec
 
-    def __init__(self, exp_id = None, local_dir = None, persist = False):
-        """ ExperimentController entity to model an execute a network experiment.
+    def __init__(self, exp_id = None, local_dir = None, persist = False,
+            fm = None, add_node_callback = None, add_edge_callback = None, 
+            **kwargs):
+        """ ExperimentController entity to model an execute a network 
+        experiment.
         
         :param exp_id: Human readable name to identify the experiment
         
         :param exp_id: Human readable name to identify the experiment
-        :type name: str
+        :type exp_id: str
 
         :param local_dir: Path to local directory where to store experiment
             related files
 
         :param local_dir: Path to local directory where to store experiment
             related files
-        :type name: str
+        :type local_dir: str
 
         :param persist: Save an XML description of the experiment after 
         completion at local_dir
 
         :param persist: Save an XML description of the experiment after 
         completion at local_dir
-        :type name: bool
+        :type persist: bool
 
 
-        """
+        :param fm: FailureManager object. If None is given, the default 
+            FailureManager class will be used
+        :type fm: FailureManager
+
+        :param add_node_callback: Callback to invoke for node instantiation
+        when automatic topology creation mode is used 
+        :type add_node_callback: function
 
 
+        :param add_edge_callback: Callback to invoke for edge instantiation 
+        when automatic topology creation mode is used 
+        :type add_edge_callback: function
+
+        """
         super(ExperimentController, self).__init__()
 
         # Logging
         super(ExperimentController, self).__init__()
 
         # Logging
@@ -195,7 +231,7 @@ class ExperimentController(object):
 
         # Local path where to store experiment related files (results, etc)
         if not local_dir:
 
         # Local path where to store experiment related files (results, etc)
         if not local_dir:
-            local_dir = tempfile.mkdtemp()
+            local_dir = tempfile.gettempdir() # /tmp
 
         self._local_dir = local_dir
         self._exp_dir = os.path.join(local_dir, self.exp_id)
 
         self._local_dir = local_dir
         self._exp_dir = os.path.join(local_dir, self.exp_id)
@@ -205,7 +241,7 @@ class ExperimentController(object):
         self._persist = persist
 
         # generator of globally unique ids
         self._persist = persist
 
         # generator of globally unique ids
-        self._guid_generator = guid.GuidGenerator()
+        self._guid_generator = GuidGenerator()
         
         # Resource managers
         self._resources = dict()
         
         # Resource managers
         self._resources = dict()
@@ -222,17 +258,25 @@ class ExperimentController(object):
         self._groups = dict()
 
         # generator of globally unique id for groups
         self._groups = dict()
 
         # generator of globally unique id for groups
-        self._group_id_generator = guid.GuidGenerator()
+        self._group_id_generator = GuidGenerator()
 
         # Flag to stop processing thread
         self._stop = False
     
         # Entity in charge of managing system failures
 
         # Flag to stop processing thread
         self._stop = False
     
         # Entity in charge of managing system failures
-        self._fm = FailureManager(self)
+        if not fm:
+            self._fm = FailureManager()
+        self._fm.set_ec(self)
 
         # EC state
         self._state = ECState.RUNNING
 
 
         # EC state
         self._state = ECState.RUNNING
 
+        # Automatically construct experiment description 
+        self._netgraph = None
+        if add_node_callback or add_edge_callback or kwargs.get("topology"):
+            self._build_from_netgraph(add_node_callback, add_edge_callback, 
+                    **kwargs)
+
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
         self._nthreads = 20
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
         self._nthreads = 20
@@ -251,6 +295,14 @@ class ExperimentController(object):
         """
         return self._logger
 
         """
         return self._logger
 
+    @property
+    def fm(self):
+        """ Returns the failure manager
+
+        """
+
+        return self._fm
+
     @property
     def failure_level(self):
         """ Returns the level of FAILURE of th experiment
     @property
     def failure_level(self):
         """ Returns the level of FAILURE of th experiment
@@ -313,11 +365,20 @@ class ExperimentController(object):
 
     @property
     def persist(self):
 
     @property
     def persist(self):
-        """ If Trie persist the ExperimentController to XML format upon completion
+        """ If True, persists the ExperimentController to XML format upon 
+        experiment completion
 
         """
         return self._persist
 
 
         """
         return self._persist
 
+    @property
+    def netgraph(self):
+        """ Return NetGraph instance if experiment description was automatically 
+        generated
+
+        """
+        return self._netgraph
+
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
@@ -450,8 +511,16 @@ class ExperimentController(object):
         return sec
 
     def save(self, dirpath = None, format = SFormats.XML):
         return sec
 
     def save(self, dirpath = None, format = SFormats.XML):
+        if dirpath == None:
+            dirpath = self.run_dir
+
+        try:
+            os.makedirs(dirpath)
+        except OSError:
+            pass
+
         serializer = ECSerializer()
         serializer = ECSerializer()
-        path = serializer.save(self, dirpath  = None, format = format)
+        path = serializer.save(self, dirpath, format = format)
         return path
 
     def get_task(self, tid):
         return path
 
     def get_task(self, tid):
@@ -478,7 +547,7 @@ class ExperimentController(object):
         return rm
 
     def get_resources_by_type(self, rtype):
         return rm
 
     def get_resources_by_type(self, rtype):
-        """ Returns a registered ResourceManager by its guid
+        """ Returns the ResourceManager objects of type rtype
 
             :param rtype: Resource type
             :type rtype: string
 
             :param rtype: Resource type
             :type rtype: string
@@ -487,8 +556,8 @@ class ExperimentController(object):
             
         """
         rms = []
             
         """
         rms = []
-        for guid, rm in self._resources.iteritems():
-            if rm.get_rtype() == type: 
+        for guid, rm in self._resources.items():
+            if rm.get_rtype() == rtype: 
                 rms.append(rm)
         return rms
 
                 rms.append(rm)
         return rms
 
@@ -497,16 +566,31 @@ class ExperimentController(object):
 
     @property
     def resources(self):
 
     @property
     def resources(self):
-        """ Returns the set() of guids of all the ResourceManager
+        """ Returns the guids of all ResourceManagers 
 
             :return: Set of all RM guids
 
             :return: Set of all RM guids
-            :rtype: set
+            :rtype: list
 
         """
 
         """
-        keys = self._resources.keys()
+        keys = list(self._resources.keys())
 
         return keys
 
 
         return keys
 
+    def filter_resources(self, rtype):
+        """ Returns the guids of all ResourceManagers of type rtype
+
+            :param rtype: Resource type
+            :type rtype: string
+            
+            :rtype: list of guids
+            
+        """
+        rms = []
+        for guid, rm in self._resources.items():
+            if rm.get_rtype() == rtype: 
+                rms.append(rm.guid)
+        return rms
+
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
         
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
         
@@ -521,7 +605,8 @@ class ExperimentController(object):
             
         """
         # Get next available guid
             
         """
         # Get next available guid
-        guid = self._guid_generator.next(guid)
+        # xxx_next_hiccup
+        guid = self._guid_generator.generate(guid)
         
         # Instantiate RM
         rm = ResourceFactory.create(rtype, self, guid)
         
         # Instantiate RM
         rm = ResourceFactory.create(rtype, self, guid)
@@ -907,7 +992,7 @@ class ExperimentController(object):
         if not guids:
             # If no guids list was passed, all 'NEW' RMs will be deployed
             guids = []
         if not guids:
             # If no guids list was passed, all 'NEW' RMs will be deployed
             guids = []
-            for guid, rm in self._resources.iteritems():
+            for guid, rm in self._resources.items():
                 if rm.state == ResourceState.NEW:
                     guids.append(guid)
                 
                 if rm.state == ResourceState.NEW:
                     guids.append(guid)
                 
@@ -919,7 +1004,8 @@ class ExperimentController(object):
         new_group = False
         if not group:
             new_group = True
         new_group = False
         if not group:
             new_group = True
-            group = self._group_id_generator.next()
+            # xxx_next_hiccup
+            group = self._group_id_generator.generate()
 
         if group not in self._groups:
             self._groups[group] = []
 
         if group not in self._groups:
             self._groups[group] = []
@@ -986,6 +1072,9 @@ class ExperimentController(object):
             :type guids: list
 
         """
             :type guids: list
 
         """
+        if self._state == ECState.RELEASED:
+            return 
+
         if isinstance(guids, int):
             guids = [guids]
 
         if isinstance(guids, int):
             guids = [guids]
 
@@ -999,11 +1088,14 @@ class ExperimentController(object):
         self.wait_released(guids)
 
         if self.persist:
         self.wait_released(guids)
 
         if self.persist:
-            self.save(dirpath = self.run_dir)
+            self.save()
 
         for guid in guids:
             if self.get(guid, "hardRelease"):
 
         for guid in guids:
             if self.get(guid, "hardRelease"):
-                self.remove_resource(guid)
+                self.remove_resource(guid)\
+
+        # Mark the EC state as RELEASED
+        self._state = ECState.RELEASED
         
     def shutdown(self):
         """ Releases all resources and stops the ExperimentController
         
     def shutdown(self):
         """ Releases all resources and stops the ExperimentController
@@ -1114,7 +1206,7 @@ class ExperimentController(object):
             try:
                 self._cond.acquire()
 
             try:
                 self._cond.acquire()
 
-                task = self._scheduler.next()
+                task = next(self._scheduler)
                 
                 if not task:
                     # No task to execute. Wait for a new task to be scheduled.
                 
                 if not task:
                     # No task to execute. Wait for a new task to be scheduled.
@@ -1184,3 +1276,19 @@ class ExperimentController(object):
         self._cond.notify()
         self._cond.release()
 
         self._cond.notify()
         self._cond.release()
 
+    def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
+            **kwargs):
+        """ Automates experiment description using a NetGraph instance.
+        """
+        self._netgraph = NetGraph(**kwargs)
+
+        if add_node_callback:
+            ### Add resources to the EC
+            for nid in self.netgraph.nodes():
+                add_node_callback(self, nid)
+
+        if add_edge_callback:
+            #### Add connections between resources
+            for nid1, nid2 in self.netgraph.edges():
+                add_edge_callback(self, nid1, nid2)
+