Modified FailureManager to abort only when critical resources fail
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 24 Oct 2013 13:08:59 +0000 (15:08 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 24 Oct 2013 13:08:59 +0000 (15:08 +0200)
36 files changed:
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/resources/all/collector.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/ccnapplication.py
src/nepi/resources/linux/ccn/ccncontent.py
src/nepi/resources/linux/ccn/ccnd.py
src/nepi/resources/linux/ccn/ccnping.py
src/nepi/resources/linux/ccn/ccnpingserver.py
src/nepi/resources/linux/ccn/ccnr.py
src/nepi/resources/linux/ccn/fibentry.py
src/nepi/resources/linux/channel.py
src/nepi/resources/linux/interface.py
src/nepi/resources/linux/mtr.py
src/nepi/resources/linux/node.py
src/nepi/resources/linux/nping.py
src/nepi/resources/linux/ping.py
src/nepi/resources/linux/tcpdump.py
src/nepi/resources/linux/traceroute.py
src/nepi/resources/linux/udptest.py
src/nepi/resources/linux/udptunnel.py
src/nepi/resources/omf/application.py
src/nepi/resources/omf/channel.py
src/nepi/resources/omf/interface.py
src/nepi/resources/omf/node.py
src/nepi/resources/omf/omf_resource.py
src/nepi/resources/planetlab/node.py
src/nepi/resources/planetlab/openvswitch/ovs.py
src/nepi/resources/planetlab/openvswitch/ovsport.py
src/nepi/resources/planetlab/openvswitch/tunnel.py
src/nepi/resources/planetlab/tap.py
src/nepi/util/parallel.py
test/execution/ec.py
test/execution/resource.py
test/resources/planetlab/ovs.py
test/util/parallel.py [new file with mode: 0644]

index 7277ae6..c5978de 100644 (file)
@@ -35,52 +35,48 @@ import random
 import sys
 import time
 import threading
-
-class FailurePolicy(object):
-    """ Defines how to respond to experiment failures  
-    """
-    IGNORE_RM_FAILURE = 1
-    ABORT_ON_RM_FAILURE = 2
+import weakref
 
 class FailureLevel(object):
-    """ Describe the system failure state
+    """ Describes the system failure state
     """
     OK = 1
     RM_FAILURE = 2
-    TASK_FAILURE = 3
-    EC_FAILURE = 4
+    EC_FAILURE = 3
 
 class FailureManager(object):
     """ The FailureManager is responsible for handling errors,
     and deciding whether an experiment should be aborted
     """
 
-    def __init__(self, failure_policy = None):
+    def __init__(self, ec):
+        self._ec = weakref.ref(ec)
         self._failure_level = FailureLevel.OK
-        self._failure_policy = failure_policy or \
-                FailurePolicy.ABORT_ON_RM_FAILURE
 
     @property
-    def abort(self):
-        if self._failure_level == FailureLevel.EC_FAILURE:
-            return True
-
-        if self._failure_level in [FailureLevel.TASK_FAILURE, 
-                FailureLevel.RM_FAILURE] and \
-                        self._failure_policy == FailurePolicy.ABORT_ON_RM_FAILURE:
-            return True
-
-        return False
+    def ec(self):
+        """ Returns the Experiment Controller """
+        return self._ec()
 
-    def set_rm_failure(self):
-        self._failure_level = FailureLevel.RM_FAILURE
+    @property
+    def abort(self):
+        if self._failure_level == FailureLevel.OK:
+            for guid in self.ec.resources:
+                state = self.ec.state(guid)
+                critical = self.ec.get(guid, "critical")
+
+                if state == ResourceState.FAILED and critical:
+                    self._failure_level = FailureLevel.RM_FAILURE
+                    self.ec.logger.debug("RM critical failure occurred on guid %d." \
+                            " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
+                    break
 
-    def set_task_failure(self):
-        self._failure_level = FailureLevel.TASK_FAILURE
+        return self._failure_level != FailureLevel.OK
 
     def set_ec_failure(self):
         self._failure_level = FailureLevel.EC_FAILURE
 
+
 class ECState(object):
     """ State of the Experiment Controller
    
@@ -175,7 +171,9 @@ class ExperimentController(object):
         # Resource managers
         self._resources = dict()
 
-        # Scheduler
+        # Scheduler. It a queue that holds tasks scheduled for
+        # execution, and yields the next task to be executed 
+        # ordered by execution and arrival time
         self._scheduler = HeapScheduler()
 
         # Tasks
@@ -186,22 +184,27 @@ class ExperimentController(object):
 
         # generator of globally unique id for groups
         self._group_id_generator = guid.GuidGenerator()
-        # Event processing thread
-        self._cond = threading.Condition()
-        self._thread = threading.Thread(target = self._process)
-        self._thread.setDaemon(True)
-        self._thread.start()
 
         # Flag to stop processing thread
         self._stop = False
     
         # Entity in charge of managing system failures
-        self._fm = FailureManager()
+        self._fm = FailureManager(self)
 
         # EC state
         self._state = ECState.RUNNING
 
+        # The runner is a pool of threads used to parallelize 
+        # execution of tasks
+        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
+        self._runner = ParallelRun(maxthreads = nthreads)
+
+        # Event processing thread
+        self._cond = threading.Condition()
+        self._thread = threading.Thread(target = self._process)
+        self._thread.setDaemon(True)
+        self._thread.start()
+
     @property
     def logger(self):
         """ Return the logger of the Experiment Controller
@@ -234,9 +237,6 @@ class ExperimentController(object):
     def abort(self):
         return self._fm.abort
 
-    def set_rm_failure(self):
-        self._fm.set_rm_failure()
-
     def wait_finished(self, guids):
         """ Blocking method that wait until all RMs in the 'guid' list 
             reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or 
@@ -316,17 +316,19 @@ class ExperimentController(object):
             # If a guid reached one of the target states, remove it from list
             guid = guids[0]
             rstate = self.state(guid)
+            
+            hrrstate = ResourceState2str.get(rstate)
+            hrstate = ResourceState2str.get(state)
 
             if rstate >= state:
                 guids.remove(guid)
+                self.logger.debug(" guid %d DONE - state is %s, required is >= %s " % (
+                    guid, hrrstate, hrstate))
             else:
                 # Debug...
-                hrrstate = ResourceState2str.get(rstate)
-                hrstate = ResourceState2str.get(state)
                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
                     guid, hrrstate, hrstate))
-
-            time.sleep(0.5)
+                time.sleep(0.5)
   
     def get_task(self, tid):
         """ Get a specific task
@@ -694,8 +696,10 @@ class ExperimentController(object):
             guids = self.resources
 
         # Remove all pending tasks from the scheduler queue
-        for tis in self._scheduler.pending:
-            self._scheduler.remove(tis)
+        for tid in list(self._scheduler.pending):
+            self._scheduler.remove(tid)
+
+        self._runner.empty()
 
         for guid in guids:
             rm = self.get_resource(guid)
@@ -708,6 +712,10 @@ class ExperimentController(object):
         Releases all the resources and stops task processing thread
 
         """
+        # If there was a major failure we can't exit gracefully
+        if self._state == ECState.FAILED:
+            raise RuntimeError("EC failure. Can not exit gracefully")
+
         self.release()
 
         # Mark the EC state as TERMINATED
@@ -788,10 +796,8 @@ class ExperimentController(object):
         that might have been raised by the workers.
 
         """
-        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
 
-        runner = ParallelRun(maxthreads = nthreads)
-        runner.start()
+        self._runner.start()
 
         while not self._stop:
             try:
@@ -822,7 +828,7 @@ class ExperimentController(object):
 
                 if task:
                     # Process tasks in parallel
-                    runner.put(self._execute, task)
+                    self._runner.put(self._execute, task)
             except: 
                 import traceback
                 err = traceback.format_exc()
@@ -830,13 +836,14 @@ class ExperimentController(object):
 
                 # Set the EC to FAILED state 
                 self._state = ECState.FAILED
-
-                # Set the FailureManager failure level
+            
+                # Set the FailureManager failure level to EC failure
                 self._fm.set_ec_failure()
 
         self.logger.debug("Exiting the task processing loop ... ")
-        runner.sync()
-        runner.destroy()
+        
+        self._runner.sync()
+        self._runner.destroy()
 
     def _execute(self, task):
         """ Executes a single task. 
@@ -864,9 +871,6 @@ class ExperimentController(object):
             
             self.logger.error("Error occurred while executing task: %s" % err)
 
-            # Set the FailureManager failure level
-            self._fm.set_task_failure()
-
     def _notify(self):
         """ Awakes the processing thread in case it is blocked waiting
         for a new task to be scheduled.
index 18edeb0..b4994e0 100644 (file)
@@ -19,6 +19,7 @@
 
 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
 from nepi.util.logger import Logger
+from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import TraceAttr
 
 import copy
@@ -80,6 +81,20 @@ def clsinit_copy(cls):
     cls._clsinit_copy()
     return cls
 
+def failtrap(func):
+    def wrapped(self, *args, **kwargs):
+        try:
+            return func(self, *args, **kwargs)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
+            self.debug("SETTING guid %d to state FAILED" % self.guid)
+            self.fail()
+            raise
+    
+    return wrapped
+
 # Decorator to invoke class initialization method
 @clsinit
 class ResourceManager(Logger):
@@ -138,8 +153,14 @@ class ResourceManager(Logger):
         resource attributes
 
         """
-        pass
+        critical = Attribute("critical", "Defines whether the resource is critical. "
+                " A failure on a critical resource will interrupt the experiment. ",
+                type = Types.Bool,
+                default = True,
+                flags = Flags.ExecReadOnly)
 
+        cls._register_attribute(critical)
+        
     @classmethod
     def _register_traces(cls):
         """ Resource subclasses will invoke this method to register
@@ -309,7 +330,7 @@ class ResourceManager(Logger):
 
     @property
     def state(self):
-        """ Get the state of the current RM """
+        """ Get the current state of the RM """
         return self._state
 
     def log_message(self, msg):
@@ -344,27 +365,42 @@ class ResourceManager(Logger):
     def discover(self):
         """ Performs resource discovery.
 
-        This  method is resposible for selecting an individual resource
+        This  method is responsible for selecting an individual resource
         matching user requirements.
         This method should be redefined when necessary in child classes.
+
+        If overridden in child classes, make sure to use the failtrap 
+        decorator to ensure the RM state will be set to FAILED in the event 
+        of an exception.
+
         """
         self.set_discovered()
 
     def provision(self):
         """ Performs resource provisioning.
 
-        This  method is resposible for provisioning one resource.
+        This  method is responsible for provisioning one resource.
         After this method has been successfully invoked, the resource
-        should be acccesible/controllable by the RM.
+        should be accessible/controllable by the RM.
         This method should be redefined when necessary in child classes.
+
+        If overridden in child classes, make sure to use the failtrap 
+        decorator to ensure the RM state will be set to FAILED in the event 
+        of an exception.
+
         """
         self.set_provisioned()
 
     def start(self):
-        """ Starts the resource.
+        """ Starts the RM.
         
         There is no generic start behavior for all resources.
         This method should be redefined when necessary in child classes.
+
+        If overridden in child classes, make sure to use the failtrap 
+        decorator to ensure the RM state will be set to FAILED in the event 
+        of an exception.
+
         """
         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
             self.error("Wrong state %s for start" % self.state)
@@ -373,10 +409,15 @@ class ResourceManager(Logger):
         self.set_started()
 
     def stop(self):
-        """ Stops the resource.
+        """ Interrupts the RM, stopping any tasks the RM was performing.
         
         There is no generic stop behavior for all resources.
         This method should be redefined when necessary in child classes.
+
+        If overridden in child classes, make sure to use the failtrap 
+        decorator to ensure the RM state will be set to FAILED in the event 
+        of an exception.
+
         """
         if not self.state in [ResourceState.STARTED]:
             self.error("Wrong state %s for stop" % self.state)
@@ -385,7 +426,15 @@ class ResourceManager(Logger):
         self.set_stopped()
 
     def deploy(self):
-        """ Execute all steps required for the RM to reach the state READY
+        """ Execute all steps required for the RM to reach the state READY.
+
+        This  method is responsible for deploying the resource (and invoking the
+        discover and provision methods).
+        This method should be redefined when necessary in child classes.
+
+        If overridden in child classes, make sure to use the failtrap 
+        decorator to ensure the RM state will be set to FAILED in the event 
+        of an exception.
 
         """
         if self.state > ResourceState.READY:
@@ -396,14 +445,41 @@ class ResourceManager(Logger):
         self.set_ready()
 
     def release(self):
+        """ Perform actions to free resources used by the RM.
+        
+        This  method is responsible for releasing resources that were
+        used during the experiment by the RM.
+        This method should be redefined when necessary in child classes.
+
+        If overridden in child classes, this method should never
+        raise an error and it must ensure the RM is set to state RELEASED.
+
+        """
         self.set_released()
 
     def finish(self):
+        """ Sets the RM to state FINISHED. 
+        
+        The FINISHED state is different from STOPPED in that it should not be 
+        directly invoked by the user.
+        STOPPED indicates that the user interrupted the RM, FINISHED means
+        that the RM concluded normally the actions it was supposed to perform.
+        This method should be redefined when necessary in child classes.
+        
+        If overridden in child classes, make sure to use the failtrap 
+        decorator to ensure the RM state will be set to FAILED in the event 
+        of an exception.
+
+        """
+
         self.set_finished()
  
     def fail(self):
+        """ Sets the RM to state FAILED.
+
+        """
+
         self.set_failed()
-        self.ec.set_rm_failure()
 
     def set(self, name, value):
         """ Set the value of the attribute
@@ -641,9 +717,11 @@ class ResourceManager(Logger):
         reschedule = False
         delay = reschedule_delay 
 
-        ## evaluate if set conditions are met
+        ## evaluate if conditions to start are met
+        if self.ec.abort:
+            return 
 
-        # only can start when RM is either STOPPED or READY
+        # Can only start when RM is either STOPPED or READY
         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
             reschedule = True
             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
@@ -680,11 +758,14 @@ class ResourceManager(Logger):
         reschedule = False
         delay = reschedule_delay 
 
-        ## evaluate if set conditions are met
+        ## evaluate if conditions to stop are met
+        if self.ec.abort:
+            return 
 
         # only can stop when RM is STARTED
         if self.state != ResourceState.STARTED:
             reschedule = True
+            self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
         else:
             self.debug(" ---- STOP CONDITIONS ---- %s" % 
                     self.conditions.get(ResourceAction.STOP))
@@ -710,7 +791,9 @@ class ResourceManager(Logger):
         reschedule = False
         delay = reschedule_delay 
 
-        ## evaluate if set conditions are met
+        ## evaluate if conditions to deploy are met
+        if self.ec.abort:
+            return 
 
         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, 
@@ -769,43 +852,43 @@ class ResourceManager(Logger):
     
     def set_started(self):
         """ Mark ResourceManager as STARTED """
-        self._start_time = tnow()
-        self._state = ResourceState.STARTED
+        self.set_state(ResourceState.STARTED, "_start_time")
         
     def set_stopped(self):
         """ Mark ResourceManager as STOPPED """
-        self._stop_time = tnow()
-        self._state = ResourceState.STOPPED
+        self.set_state(ResourceState.STOPPED, "_stop_time")
 
     def set_ready(self):
         """ Mark ResourceManager as READY """
-        self._ready_time = tnow()
-        self._state = ResourceState.READY
+        self.set_state(ResourceState.READY, "_ready_time")
 
     def set_released(self):
         """ Mark ResourceManager as REALEASED """
-        self._release_time = tnow()
-        self._state = ResourceState.RELEASED
+        self.set_state(ResourceState.RELEASED, "_release_time")
 
     def set_finished(self):
         """ Mark ResourceManager as FINISHED """
-        self._finish_time = tnow()
-        self._state = ResourceState.FINISHED
+        self.set_state(ResourceState.FINISHED, "_finish_time")
 
     def set_failed(self):
         """ Mark ResourceManager as FAILED """
-        self._failed_time = tnow()
-        self._state = ResourceState.FAILED
+        self.set_state(ResourceState.FAILED, "_failed_time")
 
     def set_discovered(self):
         """ Mark ResourceManager as DISCOVERED """
-        self._discover_time = tnow()
-        self._state = ResourceState.DISCOVERED
+        self.set_state(ResourceState.DISCOVERED, "_discover_time")
 
     def set_provisioned(self):
         """ Mark ResourceManager as PROVISIONED """
-        self._provision_time = tnow()
-        self._state = ResourceState.PROVISIONED
+        self.set_state(ResourceState.PROVISIONED, "_provision_time")
+
+    def set_state(self, state, state_time_attr):
+        # Ensure that RM state will not change after released
+        if self._state == ResourceState.RELEASED:
+            return 
+   
+        setattr(self, state_time_attr, tnow())
+        self._state = state
 
 class ResourceFactory(object):
     _resource_types = dict()
index 812a939..864750e 100644 (file)
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-        ResourceAction
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, ResourceAction, failtrap
 from nepi.util.sshfuncs import ProcStatus
 
 import os
 import tempfile
 
-@clsinit
+@clsinit_copy
 class Collector(ResourceManager):
     """ The collector is reponsible of collecting traces
     of a same type associated to RMs into a local directory.
@@ -69,7 +69,8 @@ class Collector(ResourceManager):
     @property
     def store_path(self):
         return self._store_path
-    
+   
+    @failtrap
     def provision(self):
         trace_name = self.get("traceName")
         if not trace_name:
@@ -97,38 +98,40 @@ class Collector(ResourceManager):
 
         super(Collector, self).provision()
 
+    @failtrap
     def deploy(self):
-        try:
-            self.discover()
-            self.provision()
-        except:
-            self.fail()
-            raise
+        self.discover()
+        self.provision()
 
         super(Collector, self).deploy()
 
     def release(self):
-        trace_name = self.get("traceName")
-        rename = self.get("rename") or trace_name
-
-        msg = "Collecting '%s' traces to local directory %s" % (
-            trace_name, self.store_path)
-        self.info(msg)
-
-        rms = self.get_connected()
-        for rm in rms:
-            result = self.ec.trace(rm.guid, trace_name)
-            fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, 
-                rename))
-            try:
-                f = open(fpath, "w")
-                f.write(result)
-                f.close()
-            except:
-                msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name, 
-                        rm.guid, fpath)
-                self.error(msg)
-                continue
+        try:
+            trace_name = self.get("traceName")
+            rename = self.get("rename") or trace_name
+
+            msg = "Collecting '%s' traces to local directory %s" % (
+                trace_name, self.store_path)
+            self.info(msg)
+
+            rms = self.get_connected()
+            for rm in rms:
+                result = self.ec.trace(rm.guid, trace_name)
+                fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, 
+                    rename))
+                try:
+                    f = open(fpath, "w")
+                    f.write(result)
+                    f.close()
+                except:
+                    msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name, 
+                            rm.guid, fpath)
+                    self.error(msg)
+                    continue
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(Collector, self).release()
 
index 763106c..1848e9a 100644 (file)
@@ -19,8 +19,8 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux.node import LinuxNode
 from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -31,7 +31,7 @@ import subprocess
 # TODO: Resolve wildcards in commands!!
 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
 
-@clsinit
+@clsinit_copy
 class LinuxApplication(ResourceManager):
     """
     .. class:: Class Args :
@@ -85,7 +85,6 @@ class LinuxApplication(ResourceManager):
     _help = "Runs an application on a Linux host with a BASH command "
     _backend_type = "linux"
 
-
     @classmethod
     def _register_attributes(cls):
         command = Attribute("command", "Command to execute at application start. "
@@ -270,7 +269,8 @@ class LinuxApplication(ResourceManager):
             out = int(out.strip())
 
         return out
-            
+
+    @failtrap
     def provision(self):
         # create run dir for application
         self.node.mkdir(self.run_home)
@@ -302,7 +302,8 @@ class LinuxApplication(ResourceManager):
         # each step we check that the EC is still 
         for step in steps:
             if self.ec.abort:
-                raise RuntimeError, "EC finished"
+                self.debug("Interrupting provisioning. EC says 'ABORT")
+                return
             
             ret = step()
             if ret:
@@ -470,6 +471,7 @@ class LinuxApplication(ResourceManager):
             # replace application specific paths in the command
             return self.replace_paths(install)
 
+    @failtrap
     def deploy(self):
         # Wait until node is associated and deployed
         node = self.node
@@ -477,17 +479,14 @@ class LinuxApplication(ResourceManager):
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                command = self.get("command") or ""
-                self.info("Deploying command '%s' " % command)
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return
+            command = self.get("command") or ""
+            self.info("Deploying command '%s' " % command)
+            self.discover()
+            self.provision()
 
             super(LinuxApplication, self).deploy()
-    
+   
+    @failtrap
     def start(self):
         command = self.get("command")
 
@@ -498,15 +497,10 @@ class LinuxApplication(ResourceManager):
             # installation), then the application is directly marked as FINISHED
             self.set_finished()
         else:
-
-            try:
-                if self.in_foreground:
-                    self._run_in_foreground()
-                else:
-                    self._run_in_background()
-            except:
-                self.fail()
-                return
+            if self.in_foreground:
+                self._run_in_foreground()
+            else:
+                self._run_in_background()
 
             super(LinuxApplication, self).start()
 
@@ -514,6 +508,7 @@ class LinuxApplication(ResourceManager):
         command = self.get("command")
         sudo = self.get("sudo") or False
         x11 = self.get("forwardX11")
+        env = self.get("env")
 
         # For a command being executed in foreground, if there is stdin,
         # it is expected to be text string not a file or pipe
@@ -526,7 +521,7 @@ class LinuxApplication(ResourceManager):
         # to be able to kill the process from the stop method.
         # We also set blocking = False, since we don't want the
         # thread to block until the execution finishes.
-        (out, err), self._proc = self.execute_command(self, command, 
+        (out, err), self._proc = self.execute_command(command, 
                 env = env,
                 sudo = sudo,
                 stdin = stdin,
@@ -582,7 +577,8 @@ class LinuxApplication(ResourceManager):
                 msg = " Failed to start command '%s' " % command
                 self.error(msg, out, err)
                 raise RuntimeError, msg
-        
+    
+    @failtrap
     def stop(self):
         """ Stops application execution
         """
@@ -609,22 +605,25 @@ class LinuxApplication(ResourceManager):
                     if proc.poll() or err:
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
-                        self.fail()
-                        return
         
             super(LinuxApplication, self).stop()
 
     def release(self):
         self.info("Releasing resource")
 
-        tear_down = self.get("tearDown")
-        if tear_down:
-            self.node.execute(tear_down)
+        try:
+            tear_down = self.get("tearDown")
+            if tear_down:
+                self.node.execute(tear_down)
 
-        self.stop()
+            self.stop()
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(LinuxApplication, self).release()
-   
+        
     @property
     def state(self):
         """ Returns the state of the application
index 46a3cc4..c13c092 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-    reschedule_delay
+    reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -44,25 +44,22 @@ class LinuxCCNApplication(LinuxApplication):
         if self.ccnd: return self.ccnd.node
         return None
 
+    @failtrap
     def deploy(self):
         if not self.ccnd or self.ccnd.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                command = self.get("command") or ""
+            command = self.get("command") or ""
 
-                self.info("Deploying command '%s' " % command)
-                
-                if not self.get("env"):
-                    self.set("env", self._environment)
+            self.info("Deploying command '%s' " % command)
+            
+            if not self.get("env"):
+                self.set("env", self._environment)
+
+            self.discover()
+            self.provision()
 
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return
             self.debug("----- READY ---- ")
             self.set_ready()
 
index 1fa93cc..bb2ae46 100644 (file)
@@ -19,7 +19,7 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-    ResourceAction, reschedule_delay
+    ResourceAction, reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnr import LinuxCCNR
 from nepi.util.timefuncs import tnow
@@ -72,6 +72,7 @@ class LinuxCCNContent(LinuxApplication):
         if self.ccnr: return self.ccnr.node
         return None
 
+    @failtrap
     def deploy(self):
         if not self.ccnr or self.ccnr.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
@@ -79,26 +80,22 @@ class LinuxCCNContent(LinuxApplication):
             # ccnr needs to wait until ccnd is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                if not self.get("command"):
-                    self.set("command", self._start_command)
+            if not self.get("command"):
+                self.set("command", self._start_command)
 
-                if not self.get("env"):
-                    self.set("env", self._environment)
+            if not self.get("env"):
+                self.set("env", self._environment)
 
-                # set content to stdin, so the content will be
-                # uploaded during provision
-                self.set("stdin", self.get("content"))
+            # set content to stdin, so the content will be
+            # uploaded during provision
+            self.set("stdin", self.get("content"))
 
-                command = self.get("command")
+            command = self.get("command")
 
-                self.info("Deploying command '%s' " % command)
+            self.info("Deploying command '%s' " % command)
 
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return 
+            self.discover()
+            self.provision()
 
             self.debug("----- READY ---- ")
             self.set_ready()
@@ -125,6 +122,7 @@ class LinuxCCNContent(LinuxApplication):
             self.error(msg, out, err)
             raise RuntimeError, msg
 
+    @failtrap
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
@@ -134,7 +132,7 @@ class LinuxCCNContent(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            sef.fail()
+            raise RuntimeError, msg
 
     @property
     def _start_command(self):
index 71eb12a..0962936 100644 (file)
@@ -20,7 +20,7 @@
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit_copy, \
-        ResourceState, reschedule_delay
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.node import OSType
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -136,6 +136,7 @@ class LinuxCCND(LinuxApplication):
     def path(self):
         return "PATH=$PATH:${BIN}/%s/" % self.version 
 
+    @failtrap
     def deploy(self):
         if not self.node or self.node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
@@ -143,42 +144,38 @@ class LinuxCCND(LinuxApplication):
             # ccnd needs to wait until node is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                if not self.get("command"):
-                    self.set("command", self._start_command)
-                
-                if not self.get("depends"):
-                    self.set("depends", self._dependencies)
+            if not self.get("command"):
+                self.set("command", self._start_command)
+            
+            if not self.get("depends"):
+                self.set("depends", self._dependencies)
 
-                if not self.get("sources"):
-                    self.set("sources", self._sources)
+            if not self.get("sources"):
+                self.set("sources", self._sources)
 
-                sources = self.get("sources")
-                source = sources.split(" ")[0]
-                basename = os.path.basename(source)
-                self._version = ( basename.strip().replace(".tar.gz", "")
-                        .replace(".tar","")
-                        .replace(".gz","")
-                        .replace(".zip","") )
+            sources = self.get("sources")
+            source = sources.split(" ")[0]
+            basename = os.path.basename(source)
+            self._version = ( basename.strip().replace(".tar.gz", "")
+                    .replace(".tar","")
+                    .replace(".gz","")
+                    .replace(".zip","") )
 
-                if not self.get("build"):
-                    self.set("build", self._build)
+            if not self.get("build"):
+                self.set("build", self._build)
 
-                if not self.get("install"):
-                    self.set("install", self._install)
+            if not self.get("install"):
+                self.set("install", self._install)
 
-                if not self.get("env"):
-                    self.set("env", self._environment)
+            if not self.get("env"):
+                self.set("env", self._environment)
 
-                command = self.get("command")
+            command = self.get("command")
 
-                self.info("Deploying command '%s' " % command)
+            self.info("Deploying command '%s' " % command)
 
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return
+            self.discover()
+            self.provision()
 
             self.debug("----- READY ---- ")
             self.set_ready()
@@ -202,6 +199,7 @@ class LinuxCCND(LinuxApplication):
                 env = env,
                 raise_on_error = True)
 
+    @failtrap
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
@@ -211,8 +209,9 @@ class LinuxCCND(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self.fail()
+            raise RuntimeError, msg
 
+    @failtrap
     def stop(self):
         command = self.get('command') or ''
         
@@ -245,7 +244,7 @@ class LinuxCCND(LinuxApplication):
         state_check_delay = 0.5
         if self._state == ResourceState.STARTED and \
                 tdiffsec(tnow(), self._last_state_check) > state_check_delay:
-            (out, err), proc = self._ccndstatus
+            (out, err), proc = self._ccndstatus()
 
             retcode = proc.poll()
 
@@ -262,7 +261,6 @@ class LinuxCCND(LinuxApplication):
 
         return self._state
 
-    @property
     def _ccndstatus(self):
         env = self.get('env') or ""
         environ = self.node.format_environment(env, inline = True)
index 7821597..d828950 100644 (file)
@@ -18,8 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-    reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux.ccn.ccnpingserver import LinuxCCNPingServer
 from nepi.util.timefuncs import tnow, tdiffsec
 
@@ -65,6 +65,7 @@ class LinuxCCNPing(LinuxCCNPingServer):
         if ccnpingserver: return ccnpingserver[0]
         return None
 
+    @failtrap
     def start(self):
         if not self.ccnpingserver or \
                 self.ccnpingserver.state < ResourceState.STARTED:
index b566c78..6e87b27 100644 (file)
@@ -18,8 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-    reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
 from nepi.util.timefuncs import tnow, tdiffsec
 
@@ -54,6 +54,7 @@ class LinuxCCNPingServer(LinuxCCNApplication):
         super(LinuxCCNPingServer, self).__init__(ec, guid)
         self._home = "ccnping-serv-%s" % self.guid
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
index 46c3f3b..5319997 100644 (file)
@@ -20,7 +20,7 @@
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-    ResourceAction, reschedule_delay
+    ResourceAction, reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
 from nepi.util.timefuncs import tnow
@@ -200,6 +200,7 @@ class LinuxCCNR(LinuxApplication):
         if self.ccnd: return self.ccnd.node
         return None
 
+    @failtrap
     def deploy(self):
         if not self.ccnd or self.ccnd.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state )
@@ -207,22 +208,18 @@ class LinuxCCNR(LinuxApplication):
             # ccnr needs to wait until ccnd is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                if not self.get("command"):
-                    self.set("command", self._start_command)
+            if not self.get("command"):
+                self.set("command", self._start_command)
 
-                if not self.get("env"):
-                    self.set("env", self._environment)
+            if not self.get("env"):
+                self.set("env", self._environment)
 
-                command = self.get("command")
+            command = self.get("command")
 
-                self.info("Deploying command '%s' " % command)
+            self.info("Deploying command '%s' " % command)
 
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return 
+            self.discover()
+            self.provision()
 
             self.debug("----- READY ---- ")
             self.set_ready()
@@ -255,6 +252,7 @@ class LinuxCCNR(LinuxApplication):
                 env = env,
                 raise_on_error = True)
 
+    @failtrap
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
@@ -264,7 +262,7 @@ class LinuxCCNR(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self.fail()
+            raise RuntimeError, msg
 
     @property
     def _start_command(self):
index 48c2d6d..490d8f6 100644 (file)
@@ -20,7 +20,7 @@
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-    ResourceAction, reschedule_delay
+    ResourceAction, reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
 from nepi.util.timefuncs import tnow
@@ -108,35 +108,32 @@ class LinuxFIBEntry(LinuxApplication):
             return self.ec.trace(self._traceroute, "stdout", attr, block, offset)
 
         return super(LinuxFIBEntry, self).trace(name, attr, block, offset)
-        
+    
+    @failtrap
     def deploy(self):
         # Wait until associated ccnd is provisioned
         if not self.ccnd or self.ccnd.state < ResourceState.READY:
             # ccnr needs to wait until ccnd is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                if not self.get("ip"):
-                    host = self.get("host")
-                    ip = socket.gethostbyname(host)
-                    self.set("ip", ip)
+            if not self.get("ip"):
+                host = self.get("host")
+                ip = socket.gethostbyname(host)
+                self.set("ip", ip)
 
-                if not self.get("command"):
-                    self.set("command", self._start_command)
+            if not self.get("command"):
+                self.set("command", self._start_command)
 
-                if not self.get("env"):
-                    self.set("env", self._environment)
+            if not self.get("env"):
+                self.set("env", self._environment)
 
-                command = self.get("command")
+            command = self.get("command")
 
-                self.info("Deploying command '%s' " % command)
+            self.info("Deploying command '%s' " % command)
 
-                self.discover()
-                self.provision()
-                self.configure()
-            except:
-                self.fail()
-                return
+            self.discover()
+            self.provision()
+            self.configure()
 
             self.debug("----- READY ---- ")
             self.set_ready()
@@ -194,6 +191,7 @@ class LinuxFIBEntry(LinuxApplication):
             # schedule mtr deploy
             self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
 
+    @failtrap
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
@@ -203,8 +201,9 @@ class LinuxFIBEntry(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self.fail()
+            raise RuntimeError, msg
 
+    @failtrap
     def stop(self):
         command = self.get('command')
         env = self.get('env')
@@ -221,7 +220,7 @@ class LinuxFIBEntry(LinuxApplication):
             if err:
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
-                self.fail()
+                raise RuntimeError, msg
 
     @property
     def _start_command(self):
index 2ed5c01..429051e 100644 (file)
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState
 
-@clsinit
+@clsinit_copy
 class LinuxChannel(ResourceManager):
     _rtype = "LinuxChannel"
     _help = "Represents a wireless channel on a network of Linux hosts"
@@ -35,3 +36,4 @@ class LinuxChannel(ResourceManager):
     def valid_connection(self, guid):
         # TODO: Validate!
         return True
+
index a170f41..9ccdc4f 100644 (file)
@@ -18,8 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Types, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.linux.channel import LinuxChannel
 
@@ -33,7 +33,7 @@ import time
 # TODO: UP, MTU attributes!
 
 
-@clsinit
+@clsinit_copy
 class LinuxInterface(ResourceManager):
     _rtype = "LinuxInterface"
     _help = "Controls network devices on Linux hosts through the ifconfig tool"
@@ -102,6 +102,7 @@ class LinuxInterface(ResourceManager):
         if chan: return chan[0]
         return None
 
+    @failtrap
     def discover(self):
         devname = self.get("deviceName")
         ip4 = self.get("ip4")
@@ -183,6 +184,7 @@ class LinuxInterface(ResourceManager):
 
         super(LinuxInterface, self).discover()
 
+    @failtrap
     def provision(self):
         devname = self.get("deviceName")
         ip4 = self.get("ip4")
@@ -226,6 +228,7 @@ class LinuxInterface(ResourceManager):
 
         super(LinuxInterface, self).provision()
 
+    @failtrap
     def deploy(self):
         # Wait until node is provisioned
         node = self.node
@@ -238,19 +241,20 @@ class LinuxInterface(ResourceManager):
         else:
             # Verify if the interface exists in node. If not, configue
             # if yes, load existing configuration
-            try:
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return 
+            self.discover()
+            self.provision()
 
             super(LinuxInterface, self).deploy()
 
     def release(self):
-        tear_down = self.get("tearDown")
-        if tear_down:
-            self.execute(tear_down)
+        try:
+            tear_down = self.get("tearDown")
+            if tear_down:   
+                self.execute(tear_down)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(LinuxInterface, self).release()
 
index 85a9f25..6085721 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy 
+from nepi.execution.resource import clsinit_copy, failtrap 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
@@ -83,6 +83,7 @@ class LinuxMtr(LinuxApplication):
         self._home = "mtr-%s" % self.guid
         self._sudo_kill = True
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
index 9cab4ca..953793a 100644 (file)
@@ -17,9 +17,9 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-        reschedule_delay
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux import rpmfuncs, debfuncs 
 from nepi.util import sshfuncs, execfuncs
 from nepi.util.sshfuncs import ProcStatus
@@ -57,7 +57,7 @@ class OSType:
     UBUNTU = "ubuntu"
     DEBIAN = "debian"
 
-@clsinit
+@clsinit_copy
 class LinuxNode(ResourceManager):
     """
     .. class:: Class Args :
@@ -168,14 +168,20 @@ class LinuxNode(ResourceManager):
         
         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
                 " from node home folder before starting experiment", 
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
 
         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
                 " from a previous same experiment, before the new experiment starts", 
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
         
         clean_processes = Attribute("cleanProcesses", 
                 "Kill all running processes before starting experiment",
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
         
         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
@@ -309,7 +315,6 @@ class LinuxNode(ResourceManager):
             time.sleep(min(30.0, retrydelay))
             retrydelay *= 1.5
 
-
     @property
     def use_deb(self):
         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
@@ -323,10 +328,10 @@ class LinuxNode(ResourceManager):
     def localhost(self):
         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
 
+    @failtrap
     def provision(self):
         # check if host is alive
         if not self.is_alive():
-            
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
             self.error(msg)
             raise RuntimeError, msg
@@ -353,14 +358,12 @@ class LinuxNode(ResourceManager):
 
         super(LinuxNode, self).provision()
 
+    @failtrap
     def deploy(self):
         if self.state == ResourceState.NEW:
-            try:
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return
+            self.info("Deploying node")
+            self.discover()
+            self.provision()
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
@@ -374,19 +377,24 @@ class LinuxNode(ResourceManager):
         super(LinuxNode, self).deploy()
 
     def release(self):
-        # Node needs to wait until all associated RMs are released
-        # to be released
-        rms = self.get_connected()
-        for rm in rms:
-            if rm.state < ResourceState.STOPPED:
-                self.ec.schedule(reschedule_delay, self.release)
-                return 
-
-        tear_down = self.get("tearDown")
-        if tear_down:
-            self.execute(tear_down)
+        try:
+            rms = self.get_connected()
+            for rm in rms:
+                # Node needs to wait until all associated RMs are released
+                # before it can be released
+                if rm.state < ResourceState.STOPPED:
+                    self.ec.schedule(reschedule_delay, self.release)
+                    return 
+
+            tear_down = self.get("tearDown")
+            if tear_down:
+                self.execute(tear_down)
 
-        self.clean_processes()
+            self.clean_processes()
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(LinuxNode, self).release()
 
@@ -627,7 +635,6 @@ class LinuxNode(ResourceManager):
 
         return (out, err), proc
 
-
     def upload(self, src, dst, text = False, overwrite = True):
         """ Copy content to destination
 
index e0d6452..ec874bc 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy 
+from nepi.execution.resource import clsinit_copy, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
@@ -133,6 +133,7 @@ class LinuxNPing(LinuxApplication):
         self._home = "nping-%s" % self.guid
         self._sudo_kill = True
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
index 3447658..d085b11 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy 
+from nepi.execution.resource import clsinit_copy, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
@@ -184,6 +184,7 @@ class LinuxPing(LinuxApplication):
         super(LinuxPing, self).__init__(ec, guid)
         self._home = "ping-%s" % self.guid
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
index 3fa11f2..e9955f4 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy 
+from nepi.execution.resource import clsinit_copy, failtrap 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
@@ -316,6 +316,7 @@ class LinuxTcpdump(LinuxApplication):
         self._home = "tcpdump-%s" % self.guid
         self._sudo_kill = True
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
index 2e03f62..99eea6b 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy 
+from nepi.execution.resource import clsinit_copy, failtrap 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
@@ -64,6 +64,7 @@ class LinuxTraceroute(LinuxApplication):
         super(LinuxTraceroute, self).__init__(ec, guid)
         self._home = "traceroute-%s" % self.guid
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
index 1b7ac9c..6ad0085 100644 (file)
@@ -19,8 +19,7 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-        reschedule_delay
-from nepi.execution.resource import clsinit_copy 
+        reschedule_delay, failtrap 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
@@ -214,6 +213,7 @@ class LinuxUdpTest(LinuxApplication):
         super(LinuxUdpTest, self).__init__(ec, guid)
         self._home = "udptest-%s" % self.guid
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
@@ -247,6 +247,7 @@ class LinuxUdpTest(LinuxApplication):
             # finished to continue )
             self._run_in_background()
     
+    @failtrap
     def start(self):
         if self.get("s") == True:
             # Server is already running
@@ -258,7 +259,7 @@ class LinuxUdpTest(LinuxApplication):
             else:
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
-                self.fail()
+                raise RuntimeError, err
         else:
             super(LinuxUdpTest, self).start()
  
index 4dae96c..1efe230 100644 (file)
@@ -19,7 +19,7 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-        reschedule_delay
+        reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -161,6 +161,7 @@ class UdpTunnel(LinuxApplication):
         port = self.wait_local_port(endpoint)
         return (port, pid, ppid)
 
+    @failtrap
     def provision(self):
         # create run dir for tunnel on each node 
         self.endpoint1.node.mkdir(self.run_home(self.endpoint1))
@@ -190,21 +191,19 @@ class UdpTunnel(LinuxApplication):
  
         self.set_provisioned()
 
+    @failtrap
     def deploy(self):
         if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
             (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return
+            self.discover()
+            self.provision()
  
             self.debug("----- READY ---- ")
             self.set_ready()
 
+    @failtrap
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
@@ -214,8 +213,9 @@ class UdpTunnel(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self.fail()
+            raise RuntimeError, msg
 
+    @failtrap
     def stop(self):
         """ Stops application execution
         """
@@ -234,8 +234,7 @@ class UdpTunnel(LinuxApplication):
                     # check if execution errors occurred
                     msg = " Failed to STOP tunnel"
                     self.error(msg, err1, err2)
-                    self.fail()
-                    return
+                    raise RuntimeError, msg
 
             self.set_stopped()
 
index f0fc5f1..54f8854 100644 (file)
@@ -18,8 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.execution.attribute import Attribute, Flags 
 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
 from nepi.resources.omf.node import OMFNode
@@ -147,6 +147,7 @@ class OMFApplication(OMFResource):
 
             return True
 
+    @failtrap
     def deploy(self):
         """ Deploy the RM. It means nothing special for an application 
         for now (later it will be upload sources, ...)
@@ -166,8 +167,7 @@ class OMFApplication(OMFResource):
         if not self._omf_api :
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
-            return
+            raise RuntimeError, msg
 
         if self.get('sources'):
             gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')]
@@ -177,7 +177,7 @@ class OMFApplication(OMFResource):
 
         super(OMFApplication, self).deploy()
 
-
+    @failtrap
     def start(self):
         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
          to execute the application. 
@@ -187,8 +187,7 @@ class OMFApplication(OMFResource):
         if not (self.get('appid') and self.get('path')) :
             msg = "Application's information are not initialized"
             self.error(msg)
-            self.fail()
-            return
+            raise RuntimeError, msg
 
         if not self.get('args'):
             self.set('args', " ")
@@ -207,11 +206,11 @@ class OMFApplication(OMFResource):
         except AttributeError:
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
             raise
 
         super(OMFApplication, self).start()
 
+    @failtrap
     def stop(self):
         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
         kill the application. 
@@ -223,8 +222,7 @@ class OMFApplication(OMFResource):
         except AttributeError:
             msg = "Credentials were not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
-            return
+            raise
 
         super(OMFApplication, self).stop()
         self.set_finished()
@@ -233,10 +231,15 @@ class OMFApplication(OMFResource):
         """ Clean the RM at the end of the experiment and release the API.
 
         """
-        if self._omf_api :
-            OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), 
-                self.get('xmppPassword'), exp_id = self.exp_id)
+        try:
+            if self._omf_api :
+                OMFAPIFactory.release_api(self.get('xmppSlice'), 
+                    self.get('xmppHost'), self.get('xmppPort'), 
+                    self.get('xmppPassword'), exp_id = self.exp_id)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(OMFApplication, self).release()
 
index b5d96a3..0e99567 100644 (file)
@@ -18,8 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.execution.attribute import Attribute, Flags 
 
 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
@@ -121,18 +121,7 @@ class OMFChannel(OMFResource):
                     res.append(couple)
         return res
 
-    def discover(self):
-        """ Discover the availables channels
-
-        """
-        pass
-     
-    def provision(self):
-        """ Provision some availables channels
-
-        """
-        pass
-
+    @failtrap
     def deploy(self):
         """ Deploy the RM. It means : Get the xmpp client and send messages 
         using OMF 5.4 protocol to configure the channel.
@@ -147,58 +136,44 @@ class OMFChannel(OMFResource):
         if not self._omf_api :
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
-            return
+            raise RuntimeError, msg
 
         if not self.get('channel'):
             msg = "Channel's value is not initialized"
             self.error(msg)
-            self.fail()
-            return
+            raise RuntimeError, msg
+
+        self._nodes_guid = self._get_target(self._connections)
 
-        self._nodes_guid = self._get_target(self._connections) 
         if self._nodes_guid == "reschedule" :
             self.ec.schedule("2s", self.deploy)
-            return False
-
-        try:
-            for couple in self._nodes_guid:
-                #print "Couple node/alias : " + couple[0] + "  ,  " + couple[1]
-                attrval = self.get('channel')
-                attrname = "net/%s/%s" % (couple[1], 'channel')
-                self._omf_api.configure(couple[0], attrname, attrval)
-        except AttributeError:
-            msg = "Credentials are not initialzed. XMPP Connections impossible"
-            self.error(msg)
-            self.fail()
-            return
-
-        super(OMFChannel, self).deploy()
-
-    def start(self):
-        """ Start the RM. It means nothing special for a channel for now
-        It becomes STARTED as soon as this method starts.
-
-        """
-
-        super(OMFChannel, self).start()
-
-    def stop(self):
-        """ Stop the RM. It means nothing special for a channel for now
-        It becomes STOPPED as soon as this method is called
-
-        """
-        super(OMFChannel, self).stop()
-        self.set_finished()
+        else:
+            try:
+                for couple in self._nodes_guid:
+                    #print "Couple node/alias : " + couple[0] + "  ,  " + couple[1]
+                    attrval = self.get('channel')
+                    attrname = "net/%s/%s" % (couple[1], 'channel')
+                    self._omf_api.configure(couple[0], attrname, attrval)
+            except AttributeError:
+                msg = "Credentials are not initialzed. XMPP Connections impossible"
+                self.error(msg)
+                raise
+
+            super(OMFChannel, self).deploy()
 
     def release(self):
         """ Clean the RM at the end of the experiment and release the API
 
         """
-        if self._omf_api :
-            OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), 
-                self.get('xmppPassword'), exp_id = self.exp_id)
+        try:
+            if self._omf_api :
+                OMFAPIFactory.release_api(self.get('xmppSlice'), 
+                    self.get('xmppHost'), self.get('xmppPort'), 
+                    self.get('xmppPassword'), exp_id = self.exp_id)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(OMFChannel, self).release()
 
index e61ad41..81e3b60 100644 (file)
@@ -18,8 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.execution.attribute import Attribute, Flags 
 
 from nepi.resources.omf.node import OMFNode
@@ -120,7 +120,6 @@ class OMFWifiInterface(OMFResource):
         if rm_list: return rm_list[0]
         return None
 
-
     def configure_iface(self):
         """ Configure the interface without the ip
 
@@ -165,6 +164,7 @@ class OMFWifiInterface(OMFResource):
 
         return True
 
+    @failtrap
     def deploy(self):
         """ Deploy the RM. It means : Get the xmpp client and send messages 
         using OMF 5.4 protocol to configure the interface.
@@ -183,21 +183,18 @@ class OMFWifiInterface(OMFResource):
         if not self._omf_api :
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
-            return
+            raise RuntimeError, msg
 
         if not (self.get('mode') and self.get('type') and self.get('essid') \
                 and self.get('ip')):
             msg = "Interface's variable are not initialized"
             self.error(msg)
-            self.fail()
-            return False
+            raise RuntimeError, msg
 
         if not self.node.get('hostname') :
             msg = "The channel is connected with an undefined node"
             self.error(msg)
-            self.fail()
-            return False
+            raise RuntimeError, msg
 
         # Just for information
         self.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + \
@@ -205,43 +202,25 @@ class OMFWifiInterface(OMFResource):
             self.get('essid') + " : " + self.get('ip'))
     
         # Check if the node is already deployed
-        chk1 = True
         if self.state < ResourceState.PROVISIONED:
-            chk1 = self.configure_iface()
-        if chk1:
-            chk2 = self.configure_ip()
+            if self.configure_iface():
+                self.configure_ip()
 
-        if not (chk1 and chk2) :
-            return False
-            
         super(OMFWifiInterface, self).deploy()
-        return True
-
-
-    def start(self):
-        """ Start the RM. It means nothing special for a channel for now
-        It becomes STARTED as soon as this method starts.
-
-        """
-
-        super(OMFWifiInterface, self).start()
-
-    def stop(self):
-        """ Stop the RM. It means nothing special for a channel for now
-        It becomes STOPPED as soon as this method is called
-
-        """
-        super(OMFWifiInterface, self).stop()
-        self.set_finished()
 
     def release(self):
         """ Clean the RM at the end of the experiment and release the API
 
         """
-        if self._omf_api :
-            OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), 
-                self.get('xmppPassword'), exp_id = self.exp_id)
+        try:
+            if self._omf_api :
+                OMFAPIFactory.release_api(self.get('xmppSlice'), 
+                    self.get('xmppHost'), self.get('xmppPort'), 
+                    self.get('xmppPassword'), exp_id = self.exp_id)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(OMFWifiInterface, self).release()
 
index 99bedf6..4da85d3 100644 (file)
 #         Julien Tribino <julien.tribino@inria.fr>
 
 
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.execution.attribute import Attribute, Flags 
 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
 import time
 
-
 @clsinit_copy
 class OMFNode(OMFResource):
     """
@@ -98,6 +97,7 @@ class OMFNode(OMFResource):
 
         return False
 
+    @failtrap
     def deploy(self):
         """ Deploy the RM. It means : Send Xmpp Message Using OMF protocol 
             to enroll the node into the experiment.
@@ -112,63 +112,37 @@ class OMFNode(OMFResource):
         if not self._omf_api :
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
-            return
+            raise RuntimeError, msg
 
         if not self.get('hostname') :
             msg = "Hostname's value is not initialized"
             self.error(msg)
-            self.fail()
-            return False
+            raise RuntimeError, msg
 
         try:
             self._omf_api.enroll_host(self.get('hostname'))
         except AttributeError:
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
-            #raise AttributeError, msg
+            raise 
 
         super(OMFNode, self).deploy()
 
-    def discover(self):
-        """ Discover the availables nodes
-
-        """
-        pass
-     
-    def provision(self):
-        """ Provision some availables nodes
-
-        """
-        pass
-
-    def start(self):
-        """Start the RM. It means nothing special for an interface for now
-           It becomes STARTED as soon as this method starts.
-
-        """
-
-        super(OMFNode, self).start()
-
-    def stop(self):
-        """Stop the RM. It means nothing special for an interface for now
-           It becomes STOPPED as soon as this method stops
-
-        """
-        super(OMFNode, self).stop()
-        self.set_finished()
-
     def release(self):
         """Clean the RM at the end of the experiment
 
         """
-        if self._omf_api :
-            self._omf_api.release(self.get('hostname'))
-
-            OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), 
-                self.get('xmppPassword'), exp_id = self.exp_id)
+        try:
+            if self._omf_api :
+                self._omf_api.release(self.get('hostname'))
+
+                OMFAPIFactory.release_api(self.get('xmppSlice'), 
+                    self.get('xmppHost'), self.get('xmppPort'), 
+                    self.get('xmppPassword'), exp_id = self.exp_id)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(OMFNode, self).release()
 
index ed2de65..632d201 100644 (file)
@@ -19,8 +19,8 @@
 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay
 
 class ResourceGateway:
     """
@@ -38,7 +38,7 @@ class ResourceGateway:
         "nicta" : "??.??.??",
     })
 
-@clsinit
+@clsinit_copy
 class OMFResource(ResourceManager):
     """
     Generic resource gathering XMPP credential information and common methods
index b26a4f3..57196b3 100644 (file)
@@ -19,8 +19,8 @@
 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
 from nepi.util.execfuncs import lexec
index bffc61e..51628fd 100644 (file)
@@ -19,7 +19,8 @@
 #         Alexandros Kouvakas <alexandros.kouvakas@inria.fr>
 
 
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, failtrap
 from nepi.execution.attribute import Attribute, Flags
 from nepi.resources.planetlab.node import PlanetlabNode        
 from nepi.resources.linux.application import LinuxApplication
@@ -114,6 +115,7 @@ class OVSWitch(LinuxApplication):
         # TODO: Validate!
         return True
 
+    @failtrap
     def provision(self):
         # create home dir for ovs
         self.node.mkdir(self.ovs_home)
@@ -136,13 +138,16 @@ class OVSWitch(LinuxApplication):
                 stderr = "check_cmd_stderr")
 
         (out, err), proc = self.node.check_output(self.ovs_checks, 'check_cmd_exitcode')
+        
         if out != "0\n":
             msg = "Command sliver-ovs does not exist on the VM"         
             self.debug(msg)
             raise RuntimeError, msg
+
         msg = "Command sliver-ovs exists" 
         self.debug(msg)                                                
 
+    @failtrap
     def deploy(self):
         """ Wait until node is associated and deployed
         """
@@ -152,19 +157,15 @@ class OVSWitch(LinuxApplication):
             self.ec.schedule(reschedule_delay, self.deploy)
 
         else:
-            try:
-                self.discover()
-                self.provision()
-                self.check_sliver_ovs()
-                self.servers_on()
-                self.create_bridge()
-                self.assign_contr()
-                self.ovs_status()
-            except:
-                self._state = ResourceState.FAILED
-                raise
-                
-            self._state = ResourceState.READY
+            self.discover()
+            self.provision()
+            self.check_sliver_ovs()
+            self.servers_on()
+            self.create_bridge()
+            self.assign_contr()
+            self.ovs_status()
+            
+            super(OVSWitch, self).deploy()
 
     def servers_on(self):
         """ Start the openvswitch servers and also checking 
@@ -201,9 +202,11 @@ class OVSWitch(LinuxApplication):
 
         # Check if the servers are running or not
         (out, err), proc = self.node.check_output(self.ovs_checks, 'status_srv_exitcode')
+        
         if out != "0\n":
             self.debug("Servers are not running")
             raise RuntimeError, msg
+        
         self.info("Servers started")  
 
     def del_old_br(self):
@@ -279,44 +282,37 @@ class OVSWitch(LinuxApplication):
         (out, err), proc = self.node.check_output(self.ovs_home, 'show_stdout')
         self.info(out)
 
-    def start(self):
-        """ Start the RM. It means nothing special for 
-            ovswitch for now.  
-        """
-        pass
-
-    def stop(self):
-        """ Stop the RM.It means nothing 
-            for ovswitch for now.
-        """
-        pass
-
     def release(self):
         """ Delete the bridge and 
             close the servers
         """
         # Node needs to wait until all associated RMs are released
         # to be released
-        from nepi.resources.planetlab.openvswitch.ovsport import OVSPort
-        rm = self.get_connected(OVSPort.rtype())
+        try:
+            from nepi.resources.planetlab.openvswitch.ovsport import OVSPort
+            rm = self.get_connected(OVSPort.rtype())
 
-        if rm[0].state < ResourceState.FINISHED:
-            self.ec.schedule(reschedule_delay, self.release)
-            return 
+            if rm[0].state < ResourceState.FINISHED:
+                self.ec.schedule(reschedule_delay, self.release)
+                return 
+                
+            msg = "Deleting the bridge %s" % self.get('bridge_name')
+            self.info(msg)
+            cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name')
+            (out, err), proc = self.node.run(cmd, self.ovs_checks,
+                    sudo = True)
+            cmd = "sliver-ovs stop"
+            (out, err), proc = self.node.run(cmd, self.ovs_checks,
+                    sudo = True)
             
-        msg = "Deleting the bridge %s" % self.get('bridge_name')
-        self.info(msg)
-        cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name')
-        (out, err), proc = self.node.run(cmd, self.ovs_checks,
-                sudo = True)
-        cmd = "sliver-ovs stop"
-        (out, err), proc = self.node.run(cmd, self.ovs_checks,
-                sudo = True)
-        
-        if proc.poll():
-            self.fail()
-            self.error(msg, out, err)
-            raise RuntimeError, msg
-     
-        self._state = ResourceState.RELEASED
-        
+            if proc.poll():
+                self.fail()
+                self.error(msg, out, err)
+                raise RuntimeError, msg
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
+
+        super(OVSWitch, self).release()
+
index d3e9d79..a7155fb 100644 (file)
@@ -19,7 +19,8 @@
 #            Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState       
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, failtrap
 from nepi.resources.planetlab.openvswitch.ovs import OVSWitch        
 from nepi.resources.planetlab.node import PlanetlabNode        
 from nepi.resources.linux.application import LinuxApplication
@@ -178,16 +179,7 @@ class OVSPort(LinuxApplication):
         command = self.replace_paths(command)
         return command
         
-    def provision(self):
-        """ Provision the ports.No meaning.
-        """
-        pass
-
-    def discover(self):
-        """ Discover the ports.No meaning
-        """    
-        pass
-
+    @failtrap
     def deploy(self):
         """ Wait until ovswitch is started
         """
@@ -197,50 +189,39 @@ class OVSPort(LinuxApplication):
             self.ec.schedule(reschedule_delay, self.deploy)
             
         else:
-            try:
-                self.discover()
-                self.provision()
-                self.get_host_ip()
-                self.create_port()
-                self.get_local_end()
-                self.ovswitch.ovs_status()
-                self._state = ResourceState.READY
-            except:
-                self._state = ResourceState.FAILED
-                raise
-
-    def start(self):
-        """ Start the RM. It means nothing special for 
-            ovsport for now.
-        """
-        pass
-       
-    def stop(self):
-        """ Stop the RM. It means nothing special for 
-            ovsport for now.        
-        """
-        pass
-        
+            self.discover()
+            self.provision()
+            self.get_host_ip()
+            self.create_port()
+            self.get_local_end()
+            self.ovswitch.ovs_status()
+            super(OVSPort, self).deploy()
+
     def release(self):
         """ Release the port RM means delete the ports
         """
         # OVS needs to wait until all associated RMs are released
         # to be released
-        from nepi.resources.planetlab.openvswitch.tunnel import Tunnel
-        rm = self.get_connected(Tunnel.rtype())
-        if rm and rm[0].state < ResourceState.FINISHED:
-            self.ec.schedule(reschedule_delay, self.release)
-            return 
-            
-        msg = "Deleting the port %s" % self.get('port_name')
-        self.info(msg)
-        cmd = "sliver-ovs del_port %s" % self.get('port_name')
-        (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks,
-                sudo = True)
-
-        if proc.poll():
-            self.fail()
-            self.error(msg, out, err)
-            raise RuntimeError, msg
-
-        self._state = ResourceState.RELEASED
+        try:
+            from nepi.resources.planetlab.openvswitch.tunnel import Tunnel
+            rm = self.get_connected(Tunnel.rtype())
+            if rm and rm[0].state < ResourceState.FINISHED:
+                self.ec.schedule(reschedule_delay, self.release)
+                return 
+                
+            msg = "Deleting the port %s" % self.get('port_name')
+            self.info(msg)
+            cmd = "sliver-ovs del_port %s" % self.get('port_name')
+            (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks,
+                    sudo = True)
+
+            if proc.poll():
+                self.fail()
+                self.error(msg, out, err)
+                raise RuntimeError, msg
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
+
+        super(OVSPort, self).release()
index 72c6728..c1f81fe 100644 (file)
@@ -19,7 +19,8 @@
 #            Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState 
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.planetlab.node import PlanetlabNode            
 from nepi.resources.planetlab.openvswitch.ovs import OVSWitch   
@@ -29,11 +30,10 @@ import os
 import time
 import socket
 
-
 reschedule_delay = "0.5s"
 
 @clsinit_copy                 
-class Tunnel(LinuxApplication):
+class OVSTunnel(LinuxApplication):
     """
     .. class:: Class Args :
       
@@ -46,7 +46,7 @@ class Tunnel(LinuxApplication):
 
     """
     
-    _rtype = "Tunnel"
+    _rtype = "OVSTunnel"
     _authorized_connections = ["OVSPort", "PlanetlabTap"]    
 
     @classmethod
@@ -93,7 +93,7 @@ class Tunnel(LinuxApplication):
         :type guid: int
     
         """
-        super(Tunnel, self).__init__(ec, guid)
+        super(OVSTunnel, self).__init__(ec, guid)
         self._home = "tunnel-%s" % self.guid
         self.port_info_tunl = []
         self._nodes = []
@@ -246,6 +246,7 @@ class Tunnel(LinuxApplication):
             self.fail()
             self.error(msg, out, err)
             raise RuntimeError, msg
+
         msg = "Connection on host %s configured" \
             % self.node.get("hostname")
         self.info(msg)
@@ -344,6 +345,7 @@ class Tunnel(LinuxApplication):
             self.info(msg)
             return                                                      
 
+    @failtrap
     def provision(self):
         """ Provision the tunnel
         """
@@ -363,68 +365,41 @@ class Tunnel(LinuxApplication):
             (self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
             switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2)
 
-        self.debug("------- READY -------")
-        self._provision_time = tnow()
-        self._state = ResourceState.PROVISIONED
-
-    def discover(self):
-        """ Discover the tunnel
-
-        """    
-        pass
+        super(OVSTunnel, self).provision()
 
+    @failtrap
     def deploy(self):
         if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
             (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                raise
-            self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
-
-    def start(self):
-        """ Start the RM. It means nothing special for 
-            ovsport for now.
-        """
-        pass
-        
-       
-    def stop(self):
-        """ Stop the RM. It means nothing special for 
-            ovsport for now.        
-        """
-        pass
+            self.discover()
+            self.provision()
 
+            super(OVSTunnel, self).deploy()
     def release(self):
         """ Release the udp_tunnel on endpoint2.
             On endpoint1 means nothing special.        
         """
-        if not self.check_endpoints():
-            # Kill the TAP devices
-            # TODO: Make more generic Release method of PLTAP
-            if self._pid and self._ppid:
-                self._nodes = self.get_node(self.endpoint2) 
-                (out, err), proc = self.node.kill(self._pid,
-                        self._ppid, sudo = True)
-            if err or proc.poll():
-                    # check if execution errors occurred
-                    msg = " Failed to delete TAP device"
-                    self.error(msg, err, err)
-                    self.fail()
-            
-        self._state = ResourceState.RELEASED
-
-
-
-
-
-
+        try:
+            if not self.check_endpoints():
+                # Kill the TAP devices
+                # TODO: Make more generic Release method of PLTAP
+                if self._pid and self._ppid:
+                    self._nodes = self.get_node(self.endpoint2) 
+                    (out, err), proc = self.node.kill(self._pid,
+                            self._ppid, sudo = True)
+                if err or proc.poll():
+                        # check if execution errors occurred
+                        msg = " Failed to delete TAP device"
+                        self.error(msg, err, err)
+                        self.fail()
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
+
+        super(OVSTunnel, self).release()
 
 
index 991fa99..d9cf17c 100644 (file)
@@ -19,7 +19,7 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-        reschedule_delay
+        reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.planetlab.node import PlanetlabNode
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -147,6 +147,7 @@ class PlanetlabTap(LinuxApplication):
         if_name = self.wait_if_name()
         self.set("deviceName", if_name) 
 
+    @failtrap
     def deploy(self):
         if not self.node or self.node.state < ResourceState.PROVISIONED:
             self.ec.schedule(reschedule_delay, self.deploy)
@@ -160,16 +161,13 @@ class PlanetlabTap(LinuxApplication):
             if not self.get("install"):
                 self.set("install", self._install)
 
-            try:
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return
+            self.discover()
+            self.provision()
 
             self.debug("----- READY ---- ")
             self.set_ready()
 
+    @failtrap
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
@@ -179,8 +177,9 @@ class PlanetlabTap(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self.fail()
+            raise RuntimeError, msg
 
+    @failtrap
     def stop(self):
         command = self.get('command') or ''
         
@@ -214,12 +213,17 @@ class PlanetlabTap(LinuxApplication):
     def release(self):
         # Node needs to wait until all associated RMs are released
         # to be released
-        from nepi.resources.linux.udptunnel import UdpTunnel
-        rms = self.get_connected(UdpTunnel.rtype())
-        for rm in rms:
-            if rm.state < ResourceState.STOPPED:
-                self.ec.schedule(reschedule_delay, self.release)
-                return 
+        try:
+            from nepi.resources.linux.udptunnel import UdpTunnel
+            rms = self.get_connected(UdpTunnel.rtype())
+            for rm in rms:
+                if rm.state < ResourceState.STOPPED:
+                    self.ec.schedule(reschedule_delay, self.release)
+                    return 
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(PlanetlabTap, self).release()
 
index f5d39d7..b7caeac 100644 (file)
@@ -30,25 +30,15 @@ N_PROCS = None
 class WorkerThread(threading.Thread):
     class QUIT:
         pass
-    class REASSIGNED:
-        pass
-    
+
     def run(self):
         while True:
             task = self.queue.get()
-            if task is None:
-                self.done = True
-                self.queue.task_done()
-                continue
-            elif task is self.QUIT:
-                self.done = True
+
+            if task is self.QUIT:
                 self.queue.task_done()
                 break
-            elif task is self.REASSIGNED:
-                continue
-            else:
-                self.done = False
-            
+
             try:
                 try:
                     callable, args, kwargs = task
@@ -61,41 +51,35 @@ class WorkerThread(threading.Thread):
             except:
                 traceback.print_exc(file = sys.stderr)
                 self.delayed_exceptions.append(sys.exc_info())
-    
-    def waitdone(self):
-        while not self.queue.empty() and not self.done:
-            self.queue.join()
-    
+
     def attach(self, queue, rvqueue, delayed_exceptions):
-        if self.isAlive():
-            self.waitdone()
-            oldqueue = self.queue
         self.queue = queue
         self.rvqueue = rvqueue
         self.delayed_exceptions = delayed_exceptions
-        if self.isAlive():
-            oldqueue.put(self.REASSIGNED)
-    
-    def detach(self):
-        if self.isAlive():
-            self.waitdone()
-            self.oldqueue = self.queue
-        self.queue = Queue.Queue()
-        self.rvqueue = None
-        self.delayed_exceptions = []
-    
-    def detach_signal(self):
-        if self.isAlive():
-            self.oldqueue.put(self.REASSIGNED)
-            del self.oldqueue
-        
+   
     def quit(self):
         self.queue.put(self.QUIT)
-        self.join()
 
-class ParallelMap(object):
+class ParallelRun(object):
     def __init__(self, maxthreads = None, maxqueue = None, results = True):
+        self.maxqueue = maxqueue
+        self.maxthreads = maxthreads
+        
+        self.queue = Queue.Queue(self.maxqueue or 0)
+        
+        self.delayed_exceptions = []
+        
+        if results:
+            self.rvqueue = Queue.Queue()
+        else:
+            self.rvqueue = None
+    
+        self.initialize_workers()
+
+    def initialize_workers(self):
         global N_PROCS
+
+        maxthreads = self.maxthreads
        
         # Compute maximum number of threads allowed by the system
         if maxthreads is None:
@@ -112,42 +96,30 @@ class ParallelMap(object):
         
         if maxthreads is None:
             maxthreads = 4
-        
-        self.queue = Queue.Queue(maxqueue or 0)
-
-        self.delayed_exceptions = []
-        
-        if results:
-            self.rvqueue = Queue.Queue()
-        else:
-            self.rvqueue = None
-    
         self.workers = []
 
         # initialize workers
         for x in xrange(maxthreads):
-            t = None
-            if t is None:
-                t = WorkerThread()
-                t.setDaemon(True)
-            else:
-                t.waitdone()
-
-            t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
-            self.workers.append(t)
+            worker = WorkerThread()
+            worker.attach(self.queue, self.rvqueue, self.delayed_exceptions)
+            worker.setDaemon(True)
+
+            self.workers.append(worker)
     
     def __del__(self):
         self.destroy()
-    
+
+    def empty(self):
+        while True:
+            try:
+                self.queue.get(block = False)
+                self.queue.task_done()
+            except Queue.Empty:
+                break
+  
     def destroy(self):
-        for worker in self.workers:
-            worker.waitdone()
-        for worker in self.workers:
-            worker.detach()
-        for worker in self.workers:
-            worker.detach_signal()
-        for worker in self.workers:
-            worker.quit()
+        self.join()
 
         del self.workers[:]
         
@@ -158,28 +130,21 @@ class ParallelMap(object):
         self.queue.put_nowait((callable, args, kwargs))
 
     def start(self):
-        for thread in self.workers:
-            if not thread.isAlive():
-                thread.start()
+        for worker in self.workers:
+            if not worker.isAlive():
+                worker.start()
     
     def join(self):
-        for thread in self.workers:
-            # That's the sync signal
-            self.queue.put(None)
-            
+        # Wait until all queued tasks have been processed
         self.queue.join()
-        for thread in self.workers:
-            thread.waitdone()
-        
-        if self.delayed_exceptions:
-            typ,val,loc = self.delayed_exceptions[0]
-            del self.delayed_exceptions[:]
-            raise typ,val,loc
-        
-        self.destroy()
+
+        for worker in self.workers:
+            worker.quit()
+
+        for worker in self.workers:
+            worker.join()
     
     def sync(self):
-        self.queue.join()
         if self.delayed_exceptions:
             typ,val,loc = self.delayed_exceptions[0]
             del self.delayed_exceptions[:]
@@ -197,18 +162,3 @@ class ParallelMap(object):
                     except Queue.Empty:
                         raise StopIteration
             
-class ParallelRun(ParallelMap):
-    def __run(self, x):
-        fn, args, kwargs = x
-        return fn(*args, **kwargs)
-    
-    def __init__(self, maxthreads = None, maxqueue = None):
-        super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
-
-    def put(self, what, *args, **kwargs):
-        super(ParallelRun, self).put(self.__run, (what, args, kwargs))
-    
-    def put_nowait(self, what, *args, **kwargs):
-        super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
-
-
index 4602148..0304014 100755 (executable)
@@ -72,17 +72,23 @@ class ExecuteControllersTestCase(unittest.TestCase):
 
     def test_schedule_exception(self):
         def raise_error():
+            # When this task is executed and the error raise,
+            # the FailureManager should set its failure level to 
+            # TASK_FAILURE
             raise RuntimeError, "NOT A REAL ERROR. JUST TESTING!"
 
         ec = ExperimentController()
-        ec.schedule("2s", raise_error)
 
-        while ec.ecstate not in [ECState.FAILED, ECState.TERMINATED]:
-           time.sleep(1)
+        tid = ec.schedule("2s", raise_error, track = True)
         
-        self.assertEquals(ec.ecstate, ECState.FAILED)
-        ec.shutdown()
+        while True:
+            task = ec.get_task(tid)
+            if task.status != TaskStatus.NEW:
+                break
+
+            time.sleep(1)
 
+        self.assertEquals(task.status, TaskStatus.ERROR)
 
 if __name__ == '__main__':
     unittest.main()
index 091c43e..e594f26 100755 (executable)
 
 
 from nepi.execution.attribute import Attribute
-from nepi.execution.ec import ExperimentController 
-from nepi.execution.resource import ResourceManager, ResourceState, clsinit, \
-        ResourceAction
+from nepi.execution.ec import ExperimentController, FailureLevel 
+from nepi.execution.resource import ResourceManager, ResourceState, \
+        clsinit_copy, ResourceAction, failtrap
 
 import random
 import time
 import unittest
 
-@clsinit
+@clsinit_copy
 class MyResource(ResourceManager):
     _rtype = "MyResource"
 
@@ -40,14 +40,13 @@ class MyResource(ResourceManager):
     def __init__(self, ec, guid):
         super(MyResource, self).__init__(ec, guid)
 
-@clsinit
+@clsinit_copy
 class AnotherResource(ResourceManager):
     _rtype = "AnotherResource"
 
     def __init__(self, ec, guid):
         super(AnotherResource, self).__init__(ec, guid)
 
-
 class Channel(ResourceManager):
     _rtype = "Channel"
 
@@ -89,7 +88,7 @@ class Node(ResourceManager):
             self.discover()
             self.provision()
             self.logger.debug(" -------- PROVISIONED ------- ")
-            self.ec.schedule("3s", self.deploy)
+            self.ec.schedule("1s", self.deploy)
         elif self.state == ResourceState.PROVISIONED:
             ifaces = self.get_connected(Interface.rtype())
             for rm in ifaces:
@@ -111,15 +110,29 @@ class Application(ResourceManager):
         if node.state < ResourceState.READY:
             self.ec.schedule("0.5s", self.deploy)
         else:
-            time.sleep(random.random() * 5)
+            time.sleep(random.random() * 2)
             super(Application, self).deploy()
             self.logger.debug(" -------- DEPLOYED ------- ")
 
     def start(self):
         super(Application, self).start()
-        time.sleep(random.random() * 5)
+        time.sleep(random.random() * 3)
         self._state = ResourceState.FINISHED
-   
+
+class ErrorApplication(ResourceManager):
+    _rtype = "ErrorApplication"
+
+    def __init__(self, ec, guid):
+        super(ErrorApplication, self).__init__(ec, guid)
+
+    @failtrap
+    def deploy(self):
+        node = self.get_connected(Node.rtype())[0]
+        if node.state < ResourceState.READY:
+            self.ec.schedule("0.5s", self.deploy)
+        else:
+            time.sleep(random.random() * 2)
+            raise RuntimeError, "NOT A REAL ERROR. JUST TESTING"
 
 class ResourceFactoryTestCase(unittest.TestCase):
     def test_add_resource_factory(self):
@@ -130,13 +143,13 @@ class ResourceFactoryTestCase(unittest.TestCase):
         ResourceFactory.register_type(AnotherResource)
 
         self.assertEquals(MyResource.rtype(), "MyResource")
-        self.assertEquals(len(MyResource._attributes), 1)
+        self.assertEquals(len(MyResource._attributes), 2)
 
         self.assertEquals(ResourceManager.rtype(), "Resource")
-        self.assertEquals(len(ResourceManager._attributes), 0)
+        self.assertEquals(len(ResourceManager._attributes), 1)
 
         self.assertEquals(AnotherResource.rtype(), "AnotherResource")
-        self.assertEquals(len(AnotherResource._attributes), 0)
+        self.assertEquals(len(AnotherResource._attributes), 1)
 
         self.assertEquals(len(ResourceFactory.resource_types()), 2)
         
@@ -274,15 +287,43 @@ class ResourceManagerTestCase(unittest.TestCase):
 
         ec.shutdown()
 
-    def test_start_with_condition(self):
+    def test_exception(self):
+        from nepi.execution.resource import ResourceFactory
+        
+        ResourceFactory.register_type(ErrorApplication)
+        ResourceFactory.register_type(Node)
+        ResourceFactory.register_type(Interface)
+        ResourceFactory.register_type(Channel)
+
+        ec = ExperimentController()
+
+        node = ec.register_resource("Node")
+
+        apps = list()
+        for i in xrange(10):
+            app = ec.register_resource("ErrorApplication")
+            ec.register_connection(app, node)
+            apps.append(app)
+
+
+        ec.deploy()
+
+        ec.wait_finished(apps)
+
+        ec.shutdown()
+
+        self.assertTrue(ec._fm._failure_level == FailureLevel.RM_FAILURE)
+
+
+    def ztest_start_with_condition(self):
         # TODO!!!
         pass
     
-    def test_stop_with_condition(self):
+    def ztest_stop_with_condition(self):
         # TODO!!!
         pass
 
-    def test_set_with_condition(self):
+    def ztest_set_with_condition(self):
         # TODO!!!
         pass
 
index 0cb6c20..d072074 100644 (file)
@@ -112,15 +112,15 @@ class OvsTestCase(unittest.TestCase):
         ec.set(tap2, "prefix4", 24)
         ec.register_connection(tap2, node4)
 
-        ovstun1 = ec.register_resource("Tunnel")
+        ovstun1 = ec.register_resource("OVSTunnel")
         ec.register_connection(port1, ovstun1)
         ec.register_connection(tap1, ovstun1)
 
-        ovstun2 = ec.register_resource("Tunnel")
+        ovstun2 = ec.register_resource("OVSTunnel")
         ec.register_connection(port3, ovstun2)
         ec.register_connection(tap2, ovstun2)
 
-        ovstun3 = ec.register_resource("Tunnel")
+        ovstun3 = ec.register_resource("OVSTunnel")
         ec.register_connection(port2, ovstun3)
         ec.register_connection(port4, ovstun3)
 
diff --git a/test/util/parallel.py b/test/util/parallel.py
new file mode 100644 (file)
index 0000000..7f8f5a0
--- /dev/null
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+
+from nepi.util.parallel import ParallelRun
+
+import datetime
+import unittest
+
+class ParallelRunTestCase(unittest.TestCase):
+    def test_run_simple(self):
+        runner = ParallelRun(maxthreads = 4)
+        runner.start()
+
+        count = [0]
+
+        def inc(count):
+            count[0] += 1
+        
+        for x in xrange(10):
+            runner.put(inc, count)
+
+        runner.destroy()
+
+        self.assertEquals(count[0], 10)
+
+    def test_run_interrupt(self):
+
+        def sleep():
+            import time
+            time.sleep(5)
+
+        startt = datetime.datetime.now()
+
+        runner = ParallelRun(maxthreads = 4)
+        runner.start()
+       
+        for x in xrange(100):
+            runner.put(sleep)
+
+        runner.empty()
+        runner.destroy()
+
+        endt = datetime.datetime.now()
+        time_elapsed = (endt - startt).seconds
+        self.assertTrue( time_elapsed < 500)
+
+    def test_run_error(self):
+        count = [0]
+
+        def inc(count):
+            count[0] += 1
+        def error():
+            raise RuntimeError()
+
+        runner = ParallelRun(maxthreads = 4)
+        runner.start()
+       
+        for x in xrange(4):
+            runner.put(inc, count)
+
+        runner.put(error)
+       
+        runner.destroy()
+      
+        self.assertEquals(count[0], 4)
+        
+        self.assertRaises(RuntimeError, runner.sync)
+
+if __name__ == '__main__':
+    unittest.main()
+