scalability benchmark ns3
[nepi.git] / src / nepi / execution / ec.py
index cf41983..c138494 100644 (file)
@@ -25,6 +25,8 @@ from nepi.execution.resource import ResourceFactory, ResourceAction, \
 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 from nepi.util.serializer import ECSerializer, SFormats
 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 from nepi.util.serializer import ECSerializer, SFormats
+from nepi.util.plotter import ECPlotter, PFormats
+from nepi.util.netgraph import NetGraph, TopologyType 
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
@@ -33,12 +35,13 @@ import functools
 import logging
 import os
 import sys
 import logging
 import os
 import sys
+import tempfile
 import time
 import threading
 import weakref
 
 class FailureLevel(object):
 import time
 import threading
 import weakref
 
 class FailureLevel(object):
-    """ Describes the system failure state """
+    """ Possible failure states for the experiment """
     OK = 1
     RM_FAILURE = 2
     EC_FAILURE = 3
     OK = 1
     RM_FAILURE = 2
     EC_FAILURE = 3
@@ -46,20 +49,20 @@ class FailureLevel(object):
 class FailureManager(object):
     """ The FailureManager is responsible for handling errors
     and deciding whether an experiment should be aborted or not
 class FailureManager(object):
     """ The FailureManager is responsible for handling errors
     and deciding whether an experiment should be aborted or not
-
     """
 
     """
 
-    def __init__(self, ec):
-        self._ec = weakref.ref(ec)
+    def __init__(self):
+        self._ec = None
         self._failure_level = FailureLevel.OK
         self._abort = False
 
         self._failure_level = FailureLevel.OK
         self._abort = False
 
+    def set_ec(self, ec):
+        self._ec = weakref.ref(ec)
+
     @property
     def ec(self):
         """ Returns the ExperimentController associated to this FailureManager 
     @property
     def ec(self):
         """ Returns the ExperimentController associated to this FailureManager 
-        
         """
         """
-        
         return self._ec()
 
     @property
         return self._ec()
 
     @property
@@ -67,6 +70,15 @@ class FailureManager(object):
         return self._abort
 
     def eval_failure(self, guid):
         return self._abort
 
     def eval_failure(self, guid):
+        """ Implements failure policy and sets the abort state of the
+        experiment based on the failure state and criticality of
+        the RM
+
+        :param guid: Guid of the RM upon which the failure of the experiment
+            is evaluated
+        :type guid: int
+
+        """
         if self._failure_level == FailureLevel.OK:
             rm = self.ec.get_resource(guid)
             state = rm.state
         if self._failure_level == FailureLevel.OK:
             rm = self.ec.get_resource(guid)
             state = rm.state
@@ -82,24 +94,20 @@ class FailureManager(object):
         self._failure_level = FailureLevel.EC_FAILURE
 
 class ECState(object):
         self._failure_level = FailureLevel.EC_FAILURE
 
 class ECState(object):
-    """ Possible states for an ExperimentController
+    """ Possible states of the ExperimentController
    
     """
     RUNNING = 1
     FAILED = 2
    
     """
     RUNNING = 1
     FAILED = 2
-    TERMINATED = 3
+    RELEASED = 3
+    TERMINATED = 4
 
 class ExperimentController(object):
     """
 
 class ExperimentController(object):
     """
-    .. class:: Class Args :
-      
-        :param exp_id: Human readable identifier for the experiment scenario. 
-        :type exp_id: str
-
     .. note::
 
     An experiment, or scenario, is defined by a concrete set of resources,
     .. note::
 
     An experiment, or scenario, is defined by a concrete set of resources,
-    behavior, configuration and interconnection of those resources. 
+    and the behavior, configuration and interconnection of those resources. 
     The Experiment Description (ED) is a detailed representation of a
     single experiment. It contains all the necessary information to 
     allow repeating the experiment. NEPI allows to describe
     The Experiment Description (ED) is a detailed representation of a
     single experiment. It contains all the necessary information to 
     allow repeating the experiment. NEPI allows to describe
@@ -114,7 +122,7 @@ class ExperimentController(object):
     recreated (and re-run) by instantiating an EC and recreating 
     the same experiment description. 
 
     recreated (and re-run) by instantiating an EC and recreating 
     the same experiment description. 
 
-    In NEPI, an experiment is represented as a graph of interconnected
+    An experiment is represented as a graph of interconnected
     resources. A resource is a generic concept in the sense that any
     component taking part of an experiment, whether physical of
     virtual, is considered a resource. A resources could be a host, 
     resources. A resource is a generic concept in the sense that any
     component taking part of an experiment, whether physical of
     virtual, is considered a resource. A resources could be a host, 
@@ -124,10 +132,9 @@ class ExperimentController(object):
     single resource. ResourceManagers are specific to a resource
     type (i.e. An RM to control a Linux application will not be
     the same as the RM used to control a ns-3 simulation).
     single resource. ResourceManagers are specific to a resource
     type (i.e. An RM to control a Linux application will not be
     the same as the RM used to control a ns-3 simulation).
-    To support a new type of resource in NEPI, a new RM must be 
-    implemented. NEPI already provides a variety of
-    RMs to control basic resources, and new can be extended from
-    the existing ones.
+    To support a new type of resource, a new RM must be implemented. 
+    NEPI already provides a variety of RMs to control basic resources, 
+    and new can be extended from the existing ones.
 
     Through the EC interface the user can create ResourceManagers (RMs),
     configure them and interconnect them, to describe an experiment.
 
     Through the EC interface the user can create ResourceManagers (RMs),
     configure them and interconnect them, to describe an experiment.
@@ -149,16 +156,45 @@ class ExperimentController(object):
     exp_id, which can be re-used in different ExperimentController,
     and the run_id, which is unique to one ExperimentController instance, and
     is automatically generated by NEPI.
     exp_id, which can be re-used in different ExperimentController,
     and the run_id, which is unique to one ExperimentController instance, and
     is automatically generated by NEPI.
-        
+   
     """
 
     @classmethod
     """
 
     @classmethod
-    def load(cls, path, format = SFormats.XML):
+    def load(cls, filepath, format = SFormats.XML):
         serializer = ECSerializer()
         serializer = ECSerializer()
-        ec = serializer.load(path)
+        ec = serializer.load(filepath)
         return ec
 
         return ec
 
-    def __init__(self, exp_id = None): 
+    def __init__(self, exp_id = None, local_dir = None, persist = False,
+            fm = None, add_node_callback = None, add_edge_callback = None, 
+            **kwargs):
+        """ ExperimentController entity to model an execute a network 
+        experiment.
+        
+        :param exp_id: Human readable name to identify the experiment
+        :type exp_id: str
+
+        :param local_dir: Path to local directory where to store experiment
+            related files
+        :type local_dir: str
+
+        :param persist: Save an XML description of the experiment after 
+        completion at local_dir
+        :type persist: bool
+
+        :param fm: FailureManager object. If None is given, the default 
+            FailureManager class will be used
+        :type fm: FailureManager
+
+        :param add_node_callback: Callback to invoke for node instantiation
+        when automatic topology creation mode is used 
+        :type add_node_callback: function
+
+        :param add_edge_callback: Callback to invoke for edge instantiation 
+        when automatic topology creation mode is used 
+        :type add_edge_callback: function
+
+        """
         super(ExperimentController, self).__init__()
 
         # Logging
         super(ExperimentController, self).__init__()
 
         # Logging
@@ -176,6 +212,17 @@ class ExperimentController(object):
         # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
         # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
+        # Local path where to store experiment related files (results, etc)
+        if not local_dir:
+            local_dir = tempfile.gettempdir() # /tmp
+
+        self._local_dir = local_dir
+        self._exp_dir = os.path.join(local_dir, self.exp_id)
+        self._run_dir = os.path.join(self.exp_dir, self.run_id)
+
+        # If True persist the experiment controller in XML format, after completion
+        self._persist = persist
+
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
         
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
         
@@ -200,11 +247,19 @@ class ExperimentController(object):
         self._stop = False
     
         # Entity in charge of managing system failures
         self._stop = False
     
         # Entity in charge of managing system failures
-        self._fm = FailureManager(self)
+        if not fm:
+            self._fm = FailureManager()
+        self._fm.set_ec(self)
 
         # EC state
         self._state = ECState.RUNNING
 
 
         # EC state
         self._state = ECState.RUNNING
 
+        # Automatically construct experiment description 
+        self._netgraph = None
+        if add_node_callback or add_edge_callback or kwargs.get("topology"):
+            self._build_from_netgraph(add_node_callback, add_edge_callback, 
+                    **kwargs)
+
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
         self._nthreads = 20
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
         self._nthreads = 20
@@ -223,6 +278,14 @@ class ExperimentController(object):
         """
         return self._logger
 
         """
         return self._logger
 
+    @property
+    def fm(self):
+        """ Returns the failure manager
+
+        """
+
+        return self._fm
+
     @property
     def failure_level(self):
         """ Returns the level of FAILURE of th experiment
     @property
     def failure_level(self):
         """ Returns the level of FAILURE of th experiment
@@ -260,7 +323,45 @@ class ExperimentController(object):
         """
         return self._nthreads
 
         """
         return self._nthreads
 
+    @property
+    def local_dir(self):
+        """ Root local directory for experiment files
+
+        """
+        return self._local_dir
+
+    @property
+    def exp_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment.
+
+        """
+        return self._exp_dir
+
+    @property
+    def run_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment run.
+
+        """
+        return self._run_dir
+
+    @property
+    def persist(self):
+        """ If True, persists the ExperimentController to XML format upon 
+        experiment completion
+
+        """
+        return self._persist
+
+    @property
+    def netgraph(self):
+        """ Return NetGraph instance if experiment description was automatically 
+        generated
+
+        """
+        return self._netgraph
+
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
@@ -381,14 +482,28 @@ class ExperimentController(object):
 
                 time.sleep(0.5)
 
 
                 time.sleep(0.5)
 
+    def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
+        plotter = ECPlotter()
+        fpath = plotter.plot(self, dirpath = dirpath, format= format, 
+                show = show)
+        return fpath
+
     def serialize(self, format = SFormats.XML):
         serializer = ECSerializer()
         sec = serializer.load(self, format = format)
         return sec
 
     def serialize(self, format = SFormats.XML):
         serializer = ECSerializer()
         sec = serializer.load(self, format = format)
         return sec
 
-    def save(self, path, format = SFormats.XML):
+    def save(self, dirpath = None, format = SFormats.XML):
+        if dirpath == None:
+            dirpath = self.run_dir
+
+        try:
+            os.makedirs(dirpath)
+        except OSError:
+            pass
+
         serializer = ECSerializer()
         serializer = ECSerializer()
-        path = serializer.save(self, path, format = format)
+        path = serializer.save(self, dirpath, format = format)
         return path
 
     def get_task(self, tid):
         return path
 
     def get_task(self, tid):
@@ -415,7 +530,7 @@ class ExperimentController(object):
         return rm
 
     def get_resources_by_type(self, rtype):
         return rm
 
     def get_resources_by_type(self, rtype):
-        """ Returns a registered ResourceManager by its guid
+        """ Returns the ResourceManager objects of type rtype
 
             :param rtype: Resource type
             :type rtype: string
 
             :param rtype: Resource type
             :type rtype: string
@@ -425,7 +540,7 @@ class ExperimentController(object):
         """
         rms = []
         for guid, rm in self._resources.iteritems():
         """
         rms = []
         for guid, rm in self._resources.iteritems():
-            if rm.get_rtype() == type: 
+            if rm.get_rtype() == rtype: 
                 rms.append(rm)
         return rms
 
                 rms.append(rm)
         return rms
 
@@ -434,16 +549,31 @@ class ExperimentController(object):
 
     @property
     def resources(self):
 
     @property
     def resources(self):
-        """ Returns the set() of guids of all the ResourceManager
+        """ Returns the guids of all ResourceManagers 
 
             :return: Set of all RM guids
 
             :return: Set of all RM guids
-            :rtype: set
+            :rtype: list
 
         """
         keys = self._resources.keys()
 
         return keys
 
 
         """
         keys = self._resources.keys()
 
         return keys
 
+    def filter_resources(self, rtype):
+        """ Returns the guids of all ResourceManagers of type rtype
+
+            :param rtype: Resource type
+            :type rtype: string
+            
+            :rtype: list of guids
+            
+        """
+        rms = []
+        for guid, rm in self._resources.iteritems():
+            if rm.get_rtype() == rtype: 
+                rms.append(rm.guid)
+        return rms
+
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
         
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
         
@@ -923,6 +1053,9 @@ class ExperimentController(object):
             :type guids: list
 
         """
             :type guids: list
 
         """
+        if self._state == ECState.RELEASED:
+           return 
+
         if isinstance(guids, int):
             guids = [guids]
 
         if isinstance(guids, int):
             guids = [guids]
 
@@ -935,9 +1068,15 @@ class ExperimentController(object):
 
         self.wait_released(guids)
 
 
         self.wait_released(guids)
 
+        if self.persist:
+            self.save()
+
         for guid in guids:
             if self.get(guid, "hardRelease"):
         for guid in guids:
             if self.get(guid, "hardRelease"):
-                self.remove_resource(guid)
+                self.remove_resource(guid)\
+
+        # Mark the EC state as RELEASED
+        self._state = ECState.RELEASED
         
     def shutdown(self):
         """ Releases all resources and stops the ExperimentController
         
     def shutdown(self):
         """ Releases all resources and stops the ExperimentController
@@ -1118,3 +1257,19 @@ class ExperimentController(object):
         self._cond.notify()
         self._cond.release()
 
         self._cond.notify()
         self._cond.release()
 
+    def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
+            **kwargs):
+        """ Automates experiment description using a NetGraph instance.
+        """
+        self._netgraph = NetGraph(**kwargs)
+
+        if add_node_callback:
+            ### Add resources to the EC
+            for nid in self.netgraph.nodes():
+                add_node_callback(self, nid)
+
+        if add_edge_callback:
+            #### Add connections between resources
+            for nid1, nid2 in self.netgraph.edges():
+                add_edge_callback(self, nid1, nid2)
+