still making both branches closer
[nepi.git] / src / nepi / execution / ec.py
index 776881c..d93ef7a 100644 (file)
@@ -3,9 +3,8 @@
 #    Copyright (C) 2013 INRIA
 #
 #    This program is free software: you can redistribute it and/or modify
 #    Copyright (C) 2013 INRIA
 #
 #    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU General Public License as published by
-#    the Free Software Foundation, either version 3 of the License, or
-#    (at your option) any later version.
+#    it under the terms of the GNU General Public License version 2 as
+#    published by the Free Software Foundation;
 #
 #    This program is distributed in the hope that it will be useful,
 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
 #
 #    This program is distributed in the hope that it will be useful,
 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -24,6 +23,9 @@ from nepi.execution.resource import ResourceFactory, ResourceAction, \
         ResourceState, ResourceState2str
 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
         ResourceState, ResourceState2str
 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
+from nepi.util.serializer import ECSerializer, SFormats
+from nepi.util.plotter import ECPlotter, PFormats
+from nepi.util.netgraph import NetGraph, TopologyType 
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
@@ -32,12 +34,13 @@ import functools
 import logging
 import os
 import sys
 import logging
 import os
 import sys
+import tempfile
 import time
 import threading
 import weakref
 
 class FailureLevel(object):
 import time
 import threading
 import weakref
 
 class FailureLevel(object):
-    """ Describes the system failure state """
+    """ Possible failure states for the experiment """
     OK = 1
     RM_FAILURE = 2
     EC_FAILURE = 3
     OK = 1
     RM_FAILURE = 2
     EC_FAILURE = 3
@@ -45,58 +48,65 @@ class FailureLevel(object):
 class FailureManager(object):
     """ The FailureManager is responsible for handling errors
     and deciding whether an experiment should be aborted or not
 class FailureManager(object):
     """ The FailureManager is responsible for handling errors
     and deciding whether an experiment should be aborted or not
-
     """
 
     """
 
-    def __init__(self, ec):
-        self._ec = weakref.ref(ec)
+    def __init__(self):
+        self._ec = None
         self._failure_level = FailureLevel.OK
         self._failure_level = FailureLevel.OK
+        self._abort = False
+
+    def set_ec(self, ec):
+        self._ec = weakref.ref(ec)
 
     @property
     def ec(self):
         """ Returns the ExperimentController associated to this FailureManager 
 
     @property
     def ec(self):
         """ Returns the ExperimentController associated to this FailureManager 
-        
         """
         """
-        
         return self._ec()
 
     @property
     def abort(self):
         return self._ec()
 
     @property
     def abort(self):
+        return self._abort
+
+    def eval_failure(self, guid):
+        """ Implements failure policy and sets the abort state of the
+        experiment based on the failure state and criticality of
+        the RM
+
+        :param guid: Guid of the RM upon which the failure of the experiment
+            is evaluated
+        :type guid: int
+
+        """
         if self._failure_level == FailureLevel.OK:
         if self._failure_level == FailureLevel.OK:
-            for guid in self.ec.resources:
-                state = self.ec.state(guid)
-                critical = self.ec.get(guid, "critical")
-                if state == ResourceState.FAILED and critical:
-                    self._failure_level = FailureLevel.RM_FAILURE
-                    self.ec.logger.debug("RM critical failure occurred on guid %d." \
-                            " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
-                    break
+            rm = self.ec.get_resource(guid)
+            state = rm.state
+            critical = rm.get("critical")
 
 
-        return self._failure_level != FailureLevel.OK
+            if state == ResourceState.FAILED and critical:
+                self._failure_level = FailureLevel.RM_FAILURE
+                self._abort = True
+                self.ec.logger.debug("RM critical failure occurred on guid %d." \
+                    " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
 
     def set_ec_failure(self):
         self._failure_level = FailureLevel.EC_FAILURE
 
 
     def set_ec_failure(self):
         self._failure_level = FailureLevel.EC_FAILURE
 
-
 class ECState(object):
 class ECState(object):
-    """ Possible states for an ExperimentController
+    """ Possible states of the ExperimentController
    
     """
     RUNNING = 1
     FAILED = 2
    
     """
     RUNNING = 1
     FAILED = 2
-    TERMINATED = 3
+    RELEASED = 3
+    TERMINATED = 4
 
 class ExperimentController(object):
     """
 
 class ExperimentController(object):
     """
-    .. class:: Class Args :
-      
-        :param exp_id: Human readable identifier for the experiment scenario. 
-        :type exp_id: str
-
     .. note::
 
     An experiment, or scenario, is defined by a concrete set of resources,
     .. note::
 
     An experiment, or scenario, is defined by a concrete set of resources,
-    behavior, configuration and interconnection of those resources. 
+    and the behavior, configuration and interconnection of those resources. 
     The Experiment Description (ED) is a detailed representation of a
     single experiment. It contains all the necessary information to 
     allow repeating the experiment. NEPI allows to describe
     The Experiment Description (ED) is a detailed representation of a
     single experiment. It contains all the necessary information to 
     allow repeating the experiment. NEPI allows to describe
@@ -111,7 +121,7 @@ class ExperimentController(object):
     recreated (and re-run) by instantiating an EC and recreating 
     the same experiment description. 
 
     recreated (and re-run) by instantiating an EC and recreating 
     the same experiment description. 
 
-    In NEPI, an experiment is represented as a graph of interconnected
+    An experiment is represented as a graph of interconnected
     resources. A resource is a generic concept in the sense that any
     component taking part of an experiment, whether physical of
     virtual, is considered a resource. A resources could be a host, 
     resources. A resource is a generic concept in the sense that any
     component taking part of an experiment, whether physical of
     virtual, is considered a resource. A resources could be a host, 
@@ -121,10 +131,9 @@ class ExperimentController(object):
     single resource. ResourceManagers are specific to a resource
     type (i.e. An RM to control a Linux application will not be
     the same as the RM used to control a ns-3 simulation).
     single resource. ResourceManagers are specific to a resource
     type (i.e. An RM to control a Linux application will not be
     the same as the RM used to control a ns-3 simulation).
-    To support a new type of resource in NEPI, a new RM must be 
-    implemented. NEPI already provides a variety of
-    RMs to control basic resources, and new can be extended from
-    the existing ones.
+    To support a new type of resource, a new RM must be implemented. 
+    NEPI already provides a variety of RMs to control basic resources, 
+    and new can be extended from the existing ones.
 
     Through the EC interface the user can create ResourceManagers (RMs),
     configure them and interconnect them, to describe an experiment.
 
     Through the EC interface the user can create ResourceManagers (RMs),
     configure them and interconnect them, to describe an experiment.
@@ -146,10 +155,45 @@ class ExperimentController(object):
     exp_id, which can be re-used in different ExperimentController,
     and the run_id, which is unique to one ExperimentController instance, and
     is automatically generated by NEPI.
     exp_id, which can be re-used in different ExperimentController,
     and the run_id, which is unique to one ExperimentController instance, and
     is automatically generated by NEPI.
-        
+   
     """
 
     """
 
-    def __init__(self, exp_id = None): 
+    @classmethod
+    def load(cls, filepath, format = SFormats.XML):
+        serializer = ECSerializer()
+        ec = serializer.load(filepath)
+        return ec
+
+    def __init__(self, exp_id = None, local_dir = None, persist = False,
+            fm = None, add_node_callback = None, add_edge_callback = None, 
+            **kwargs):
+        """ ExperimentController entity to model an execute a network 
+        experiment.
+        
+        :param exp_id: Human readable name to identify the experiment
+        :type exp_id: str
+
+        :param local_dir: Path to local directory where to store experiment
+            related files
+        :type local_dir: str
+
+        :param persist: Save an XML description of the experiment after 
+        completion at local_dir
+        :type persist: bool
+
+        :param fm: FailureManager object. If None is given, the default 
+            FailureManager class will be used
+        :type fm: FailureManager
+
+        :param add_node_callback: Callback to invoke for node instantiation
+        when automatic topology creation mode is used 
+        :type add_node_callback: function
+
+        :param add_edge_callback: Callback to invoke for edge instantiation 
+        when automatic topology creation mode is used 
+        :type add_edge_callback: function
+
+        """
         super(ExperimentController, self).__init__()
 
         # Logging
         super(ExperimentController, self).__init__()
 
         # Logging
@@ -167,6 +211,17 @@ class ExperimentController(object):
         # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
         # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
+        # Local path where to store experiment related files (results, etc)
+        if not local_dir:
+            local_dir = tempfile.gettempdir() # /tmp
+
+        self._local_dir = local_dir
+        self._exp_dir = os.path.join(local_dir, self.exp_id)
+        self._run_dir = os.path.join(self.exp_dir, self.run_id)
+
+        # If True persist the experiment controller in XML format, after completion
+        self._persist = persist
+
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
         
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
         
@@ -191,32 +246,30 @@ class ExperimentController(object):
         self._stop = False
     
         # Entity in charge of managing system failures
         self._stop = False
     
         # Entity in charge of managing system failures
-        self._fm = FailureManager(self)
+        if not fm:
+            self._fm = FailureManager()
+        self._fm.set_ec(self)
 
         # EC state
         self._state = ECState.RUNNING
 
 
         # EC state
         self._state = ECState.RUNNING
 
-        # Blacklist file for PL nodes
-        nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
-        plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
-        if not os.path.exists(plblacklist_file):
-            if os.path.isdir(nepi_home):
-                open(plblacklist_file, 'w').close()
-            else:
-                os.makedirs(nepi_home)
-                open(plblacklist_file, 'w').close()
-                    
+        # Automatically construct experiment description 
+        self._netgraph = None
+        if add_node_callback or add_edge_callback or kwargs.get("topology"):
+            self._build_from_netgraph(add_node_callback, add_edge_callback, 
+                    **kwargs)
+
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
-        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
-        self._runner = ParallelRun(maxthreads = nthreads)
+        self._nthreads = 20
+        self._runner = None
 
         # Event processing thread
         self._cond = threading.Condition()
         self._thread = threading.Thread(target = self._process)
         self._thread.setDaemon(True)
         self._thread.start()
 
         # Event processing thread
         self._cond = threading.Condition()
         self._thread = threading.Thread(target = self._process)
         self._thread.setDaemon(True)
         self._thread.start()
-
+        
     @property
     def logger(self):
         """ Returns the logger instance of the Experiment Controller
     @property
     def logger(self):
         """ Returns the logger instance of the Experiment Controller
@@ -224,6 +277,22 @@ class ExperimentController(object):
         """
         return self._logger
 
         """
         return self._logger
 
+    @property
+    def fm(self):
+        """ Returns the failure manager
+
+        """
+
+        return self._fm
+
+    @property
+    def failure_level(self):
+        """ Returns the level of FAILURE of th experiment
+
+        """
+
+        return self._fm._failure_level
+
     @property
     def ecstate(self):
         """ Returns the state of the Experiment Controller
     @property
     def ecstate(self):
         """ Returns the state of the Experiment Controller
@@ -246,6 +315,52 @@ class ExperimentController(object):
         """
         return self._run_id
 
         """
         return self._run_id
 
+    @property
+    def nthreads(self):
+        """ Returns the number of processing nthreads used
+
+        """
+        return self._nthreads
+
+    @property
+    def local_dir(self):
+        """ Root local directory for experiment files
+
+        """
+        return self._local_dir
+
+    @property
+    def exp_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment.
+
+        """
+        return self._exp_dir
+
+    @property
+    def run_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment run.
+
+        """
+        return self._run_dir
+
+    @property
+    def persist(self):
+        """ If True, persists the ExperimentController to XML format upon 
+        experiment completion
+
+        """
+        return self._persist
+
+    @property
+    def netgraph(self):
+        """ Return NetGraph instance if experiment description was automatically 
+        generated
+
+        """
+        return self._netgraph
+
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
@@ -254,6 +369,16 @@ class ExperimentController(object):
         """
         return self._fm.abort
 
         """
         return self._fm.abort
 
+    def inform_failure(self, guid):
+        """ Reports a failure in a RM to the EC for evaluation
+
+            :param guid: Resource id
+            :type guid: int
+
+        """
+
+        return self._fm.eval_failure(guid)
+
     def wait_finished(self, guids):
         """ Blocking method that waits until all RMs in the 'guids' list 
         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
     def wait_finished(self, guids):
         """ Blocking method that waits until all RMs in the 'guids' list 
         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
@@ -327,7 +452,6 @@ class ExperimentController(object):
             :type guids: list
         
         """
             :type guids: list
         
         """
-        
         if isinstance(guids, int):
             guids = [guids]
 
         if isinstance(guids, int):
             guids = [guids]
 
@@ -341,23 +465,46 @@ class ExperimentController(object):
                 break
 
             # If a guid reached one of the target states, remove it from list
                 break
 
             # If a guid reached one of the target states, remove it from list
-            guid = guids[0]
-            rstate = self.state(guid)
+            guid = guids.pop()
+            rm = self.get_resource(guid)
+            rstate = rm.state
             
             
-            hrrstate = ResourceState2str.get(rstate)
-            hrstate = ResourceState2str.get(state)
-
             if rstate >= state:
             if rstate >= state:
-                guids.remove(guid)
-                rm = self.get_resource(guid)
                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
-                    rm.get_rtype(), guid, hrrstate, hrstate))
+                    rm.get_rtype(), guid, rstate, state))
             else:
                 # Debug...
                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
             else:
                 # Debug...
                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
-                    guid, hrrstate, hrstate))
+                    guid, rstate, state))
+
+                guids.append(guid)
+
                 time.sleep(0.5)
                 time.sleep(0.5)
-  
+
+    def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
+        plotter = ECPlotter()
+        fpath = plotter.plot(self, dirpath = dirpath, format= format, 
+                show = show)
+        return fpath
+
+    def serialize(self, format = SFormats.XML):
+        serializer = ECSerializer()
+        sec = serializer.load(self, format = format)
+        return sec
+
+    def save(self, dirpath = None, format = SFormats.XML):
+        if dirpath == None:
+            dirpath = self.run_dir
+
+        try:
+            os.makedirs(dirpath)
+        except OSError:
+            pass
+
+        serializer = ECSerializer()
+        path = serializer.save(self, dirpath, format = format)
+        return path
+
     def get_task(self, tid):
         """ Returns a task by its id
 
     def get_task(self, tid):
         """ Returns a task by its id
 
@@ -372,23 +519,59 @@ class ExperimentController(object):
     def get_resource(self, guid):
         """ Returns a registered ResourceManager by its guid
 
     def get_resource(self, guid):
         """ Returns a registered ResourceManager by its guid
 
-            :param guid: Id of the task
+            :param guid: Id of the resource
             :type guid: int
             
             :rtype: ResourceManager
             
         """
             :type guid: int
             
             :rtype: ResourceManager
             
         """
-        return self._resources.get(guid)
+        rm = self._resources.get(guid)
+        return rm
+
+    def get_resources_by_type(self, rtype):
+        """ Returns the ResourceManager objects of type rtype
+
+            :param rtype: Resource type
+            :type rtype: string
+            
+            :rtype: list of ResourceManagers
+            
+        """
+        rms = []
+        for guid, rm in self._resources.items():
+            if rm.get_rtype() == rtype: 
+                rms.append(rm)
+        return rms
+
+    def remove_resource(self, guid):
+        del self._resources[guid]
 
     @property
     def resources(self):
 
     @property
     def resources(self):
-        """ Returns the set() of guids of all the ResourceManager
+        """ Returns the guids of all ResourceManagers 
 
             :return: Set of all RM guids
 
             :return: Set of all RM guids
-            :rtype: set
+            :rtype: list
 
         """
 
         """
-        return self._resources.keys()
+        keys = list(self._resources.keys())
+
+        return keys
+
+    def filter_resources(self, rtype):
+        """ Returns the guids of all ResourceManagers of type rtype
+
+            :param rtype: Resource type
+            :type rtype: string
+            
+            :rtype: list of guids
+            
+        """
+        rms = []
+        for guid, rm in self._resources.items():
+            if rm.get_rtype() == rtype: 
+                rms.append(rm.guid)
+        return rms
 
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
 
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
@@ -790,8 +973,8 @@ class ExperimentController(object):
         if not guids:
             # If no guids list was passed, all 'NEW' RMs will be deployed
             guids = []
         if not guids:
             # If no guids list was passed, all 'NEW' RMs will be deployed
             guids = []
-            for guid in self.resources:
-                if self.state(guid) == ResourceState.NEW:
+            for guid, rm in self._resources.items():
+                if rm.state == ResourceState.NEW:
                     guids.append(guid)
                 
         if isinstance(guids, int):
                     guids.append(guid)
                 
         if isinstance(guids, int):
@@ -869,6 +1052,9 @@ class ExperimentController(object):
             :type guids: list
 
         """
             :type guids: list
 
         """
+        if self._state == ECState.RELEASED:
+            return 
+
         if isinstance(guids, int):
             guids = [guids]
 
         if isinstance(guids, int):
             guids = [guids]
 
@@ -881,9 +1067,15 @@ class ExperimentController(object):
 
         self.wait_released(guids)
 
 
         self.wait_released(guids)
 
+        if self.persist:
+            self.save()
+
         for guid in guids:
             if self.get(guid, "hardRelease"):
         for guid in guids:
             if self.get(guid, "hardRelease"):
-                del self._resources[guid]
+                self.remove_resource(guid)\
+
+        # Mark the EC state as RELEASED
+        self._state = ECState.RELEASED
         
     def shutdown(self):
         """ Releases all resources and stops the ExperimentController
         
     def shutdown(self):
         """ Releases all resources and stops the ExperimentController
@@ -986,6 +1178,8 @@ class ExperimentController(object):
 
         """
 
 
         """
 
+        self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
+        self._runner = ParallelRun(maxthreads = self.nthreads)
         self._runner.start()
 
         while not self._stop:
         self._runner.start()
 
         while not self._stop:
@@ -1062,3 +1256,19 @@ class ExperimentController(object):
         self._cond.notify()
         self._cond.release()
 
         self._cond.notify()
         self._cond.release()
 
+    def _build_from_netgraph(self, add_node_callback, add_edge_callback, 
+            **kwargs):
+        """ Automates experiment description using a NetGraph instance.
+        """
+        self._netgraph = NetGraph(**kwargs)
+
+        if add_node_callback:
+            ### Add resources to the EC
+            for nid in self.netgraph.nodes():
+                add_node_callback(self, nid)
+
+        if add_edge_callback:
+            #### Add connections between resources
+            for nid1, nid2 in self.netgraph.edges():
+                add_edge_callback(self, nid1, nid2)
+