Adding state RELEASED in EC to allow two stage termination from the experiment runner
[nepi.git] / src / nepi / execution / ec.py
index 2c6557f..58ece22 100644 (file)
@@ -26,6 +26,7 @@ from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 from nepi.util.serializer import ECSerializer, SFormats
 from nepi.util.plotter import ECPlotter, PFormats
+from nepi.util.netgraph import NetGraph, TopologyType 
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
@@ -34,6 +35,7 @@ import functools
 import logging
 import os
 import sys
+import tempfile
 import time
 import threading
 import weakref
@@ -88,7 +90,8 @@ class ECState(object):
     """
     RUNNING = 1
     FAILED = 2
-    TERMINATED = 3
+    RELEASED = 3
+    TERMINATED = 4
 
 class ExperimentController(object):
     """
@@ -100,7 +103,7 @@ class ExperimentController(object):
     .. note::
 
     An experiment, or scenario, is defined by a concrete set of resources,
-    behavior, configuration and interconnection of those resources. 
+    and the behavior, configuration and interconnection of those resources. 
     The Experiment Description (ED) is a detailed representation of a
     single experiment. It contains all the necessary information to 
     allow repeating the experiment. NEPI allows to describe
@@ -115,7 +118,7 @@ class ExperimentController(object):
     recreated (and re-run) by instantiating an EC and recreating 
     the same experiment description. 
 
-    In NEPI, an experiment is represented as a graph of interconnected
+    An experiment is represented as a graph of interconnected
     resources. A resource is a generic concept in the sense that any
     component taking part of an experiment, whether physical of
     virtual, is considered a resource. A resources could be a host, 
@@ -125,10 +128,9 @@ class ExperimentController(object):
     single resource. ResourceManagers are specific to a resource
     type (i.e. An RM to control a Linux application will not be
     the same as the RM used to control a ns-3 simulation).
-    To support a new type of resource in NEPI, a new RM must be 
-    implemented. NEPI already provides a variety of
-    RMs to control basic resources, and new can be extended from
-    the existing ones.
+    To support a new type of resource, a new RM must be implemented. 
+    NEPI already provides a variety of RMs to control basic resources, 
+    and new can be extended from the existing ones.
 
     Through the EC interface the user can create ResourceManagers (RMs),
     configure them and interconnect them, to describe an experiment.
@@ -150,7 +152,7 @@ class ExperimentController(object):
     exp_id, which can be re-used in different ExperimentController,
     and the run_id, which is unique to one ExperimentController instance, and
     is automatically generated by NEPI.
-        
+   
     """
 
     @classmethod
@@ -159,7 +161,31 @@ class ExperimentController(object):
         ec = serializer.load(filepath)
         return ec
 
-    def __init__(self, exp_id = None): 
+    def __init__(self, exp_id = None, local_dir = None, persist = False,
+            add_node_callback = None, add_edge_callback = None, **kwargs):
+        """ ExperimentController entity to model an execute a network 
+        experiment.
+        
+        :param exp_id: Human readable name to identify the experiment
+        :type exp_id: str
+
+        :param local_dir: Path to local directory where to store experiment
+            related files
+        :type local_dir: str
+
+        :param persist: Save an XML description of the experiment after 
+        completion at local_dir
+        :type persist: bool
+
+        :param add_node_callback: Callback to invoke for node instantiation
+        when automatic topology creation mode is used 
+        :type add_node_callback: function
+
+        :param add_edge_callback: Callback to invoke for edge instantiation 
+        when automatic topology creation mode is used 
+        :type add_edge_callback: function
+
+        """
         super(ExperimentController, self).__init__()
 
         # Logging
@@ -177,6 +203,17 @@ class ExperimentController(object):
         # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
+        # Local path where to store experiment related files (results, etc)
+        if not local_dir:
+            local_dir = tempfile.gettempdir() # /tmp
+
+        self._local_dir = local_dir
+        self._exp_dir = os.path.join(local_dir, self.exp_id)
+        self._run_dir = os.path.join(self.exp_dir, self.run_id)
+
+        # If True persist the experiment controller in XML format, after completion
+        self._persist = persist
+
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
         
@@ -206,6 +243,12 @@ class ExperimentController(object):
         # EC state
         self._state = ECState.RUNNING
 
+        # Automatically construct experiment description 
+        self._netgraph = None
+        if add_node_callback or add_edge_callback or kwargs.get("topology"):
+            self._build_from_netgraph(add_node_callback, add_edge_callback, 
+                    **kwargs)
+
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
         self._nthreads = 20
@@ -261,7 +304,45 @@ class ExperimentController(object):
         """
         return self._nthreads
 
+    @property
+    def local_dir(self):
+        """ Root local directory for experiment files
+
+        """
+        return self._local_dir
+
+    @property
+    def exp_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment.
+
+        """
+        return self._exp_dir
+
+    @property
+    def run_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment run.
+
+        """
+        return self._run_dir
+
+    @property
+    def persist(self):
+        """ If True, persists the ExperimentController to XML format upon 
+        experiment completion
+
+        """
+        return self._persist
+
+    @property
+    def netgraph(self):
+        """ Return NetGraph instance if experiment description was automatically 
+        generated
+
+        """
+        return self._netgraph
+
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
@@ -394,8 +475,16 @@ class ExperimentController(object):
         return sec
 
     def save(self, dirpath = None, format = SFormats.XML):
+        if dirpath == None:
+            dirpath = self.run_dir
+
+        try:
+            os.makedirs(dirpath)
+        except OSError:
+            pass
+
         serializer = ECSerializer()
-        path = serializer.save(self, dirpath  = None, format = format)
+        path = serializer.save(self, dirpath, format = format)
         return path
 
     def get_task(self, tid):
@@ -422,7 +511,7 @@ class ExperimentController(object):
         return rm
 
     def get_resources_by_type(self, rtype):
-        """ Returns a registered ResourceManager by its guid
+        """ Returns the ResourceManager objects of type rtype
 
             :param rtype: Resource type
             :type rtype: string
@@ -432,7 +521,7 @@ class ExperimentController(object):
         """
         rms = []
         for guid, rm in self._resources.iteritems():
-            if rm.get_rtype() == type: 
+            if rm.get_rtype() == rtype: 
                 rms.append(rm)
         return rms
 
@@ -441,16 +530,31 @@ class ExperimentController(object):
 
     @property
     def resources(self):
-        """ Returns the set() of guids of all the ResourceManager
+        """ Returns the guids of all ResourceManagers 
 
             :return: Set of all RM guids
-            :rtype: set
+            :rtype: list
 
         """
         keys = self._resources.keys()
 
         return keys
 
+    def filter_resources(self, rtype):
+        """ Returns the guids of all ResourceManagers of type rtype
+
+            :param rtype: Resource type
+            :type rtype: string
+            
+            :rtype: list of guids
+            
+        """
+        rms = []
+        for guid, rm in self._resources.iteritems():
+            if rm.get_rtype() == rtype: 
+                rms.append(rm.guid)
+        return rms
+
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
         
@@ -930,6 +1034,9 @@ class ExperimentController(object):
             :type guids: list
 
         """
+        if self._state == ECState.RELEASED:
+           return 
+
         if isinstance(guids, int):
             guids = [guids]
 
@@ -942,9 +1049,15 @@ class ExperimentController(object):
 
         self.wait_released(guids)
 
+        if self.persist:
+            self.save()
+
         for guid in guids:
             if self.get(guid, "hardRelease"):
-                self.remove_resource(guid)
+                self.remove_resource(guid)\
+
+        # Mark the EC state as RELEASED
+        self._state = ECState.RELEASED
         
     def shutdown(self):
         """ Releases all resources and stops the ExperimentController
@@ -1125,3 +1238,19 @@ class ExperimentController(object):
         self._cond.notify()
         self._cond.release()
 
+    def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
+            **kwargs):
+        """ Automates experiment description using a NetGraph instance.
+        """
+        self._netgraph = NetGraph(**kwargs)
+
+        if add_node_callback:
+            ### Add resources to the EC
+            for nid in self.netgraph.nodes():
+                add_node_callback(self, nid)
+
+        if add_edge_callback:
+            #### Add connections between resources
+            for nid1, nid2 in self.netgraph.edges():
+                add_edge_callback(self, nid1, nid2)
+