Modified FailureManager to abort only when critical resources fail
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 24 Oct 2013 13:08:59 +0000 (15:08 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Thu, 24 Oct 2013 13:08:59 +0000 (15:08 +0200)
36 files changed:
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/resources/all/collector.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/ccnapplication.py
src/nepi/resources/linux/ccn/ccncontent.py
src/nepi/resources/linux/ccn/ccnd.py
src/nepi/resources/linux/ccn/ccnping.py
src/nepi/resources/linux/ccn/ccnpingserver.py
src/nepi/resources/linux/ccn/ccnr.py
src/nepi/resources/linux/ccn/fibentry.py
src/nepi/resources/linux/channel.py
src/nepi/resources/linux/interface.py
src/nepi/resources/linux/mtr.py
src/nepi/resources/linux/node.py
src/nepi/resources/linux/nping.py
src/nepi/resources/linux/ping.py
src/nepi/resources/linux/tcpdump.py
src/nepi/resources/linux/traceroute.py
src/nepi/resources/linux/udptest.py
src/nepi/resources/linux/udptunnel.py
src/nepi/resources/omf/application.py
src/nepi/resources/omf/channel.py
src/nepi/resources/omf/interface.py
src/nepi/resources/omf/node.py
src/nepi/resources/omf/omf_resource.py
src/nepi/resources/planetlab/node.py
src/nepi/resources/planetlab/openvswitch/ovs.py
src/nepi/resources/planetlab/openvswitch/ovsport.py
src/nepi/resources/planetlab/openvswitch/tunnel.py
src/nepi/resources/planetlab/tap.py
src/nepi/util/parallel.py
test/execution/ec.py
test/execution/resource.py
test/resources/planetlab/ovs.py
test/util/parallel.py [new file with mode: 0644]

index 7277ae6..c5978de 100644 (file)
@@ -35,52 +35,48 @@ import random
 import sys
 import time
 import threading
 import sys
 import time
 import threading
-
-class FailurePolicy(object):
-    """ Defines how to respond to experiment failures  
-    """
-    IGNORE_RM_FAILURE = 1
-    ABORT_ON_RM_FAILURE = 2
+import weakref
 
 class FailureLevel(object):
 
 class FailureLevel(object):
-    """ Describe the system failure state
+    """ Describes the system failure state
     """
     OK = 1
     RM_FAILURE = 2
     """
     OK = 1
     RM_FAILURE = 2
-    TASK_FAILURE = 3
-    EC_FAILURE = 4
+    EC_FAILURE = 3
 
 class FailureManager(object):
     """ The FailureManager is responsible for handling errors,
     and deciding whether an experiment should be aborted
     """
 
 
 class FailureManager(object):
     """ The FailureManager is responsible for handling errors,
     and deciding whether an experiment should be aborted
     """
 
-    def __init__(self, failure_policy = None):
+    def __init__(self, ec):
+        self._ec = weakref.ref(ec)
         self._failure_level = FailureLevel.OK
         self._failure_level = FailureLevel.OK
-        self._failure_policy = failure_policy or \
-                FailurePolicy.ABORT_ON_RM_FAILURE
 
     @property
 
     @property
-    def abort(self):
-        if self._failure_level == FailureLevel.EC_FAILURE:
-            return True
-
-        if self._failure_level in [FailureLevel.TASK_FAILURE, 
-                FailureLevel.RM_FAILURE] and \
-                        self._failure_policy == FailurePolicy.ABORT_ON_RM_FAILURE:
-            return True
-
-        return False
+    def ec(self):
+        """ Returns the Experiment Controller """
+        return self._ec()
 
 
-    def set_rm_failure(self):
-        self._failure_level = FailureLevel.RM_FAILURE
+    @property
+    def abort(self):
+        if self._failure_level == FailureLevel.OK:
+            for guid in self.ec.resources:
+                state = self.ec.state(guid)
+                critical = self.ec.get(guid, "critical")
+
+                if state == ResourceState.FAILED and critical:
+                    self._failure_level = FailureLevel.RM_FAILURE
+                    self.ec.logger.debug("RM critical failure occurred on guid %d." \
+                            " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
+                    break
 
 
-    def set_task_failure(self):
-        self._failure_level = FailureLevel.TASK_FAILURE
+        return self._failure_level != FailureLevel.OK
 
     def set_ec_failure(self):
         self._failure_level = FailureLevel.EC_FAILURE
 
 
     def set_ec_failure(self):
         self._failure_level = FailureLevel.EC_FAILURE
 
+
 class ECState(object):
     """ State of the Experiment Controller
    
 class ECState(object):
     """ State of the Experiment Controller
    
@@ -175,7 +171,9 @@ class ExperimentController(object):
         # Resource managers
         self._resources = dict()
 
         # Resource managers
         self._resources = dict()
 
-        # Scheduler
+        # Scheduler. It a queue that holds tasks scheduled for
+        # execution, and yields the next task to be executed 
+        # ordered by execution and arrival time
         self._scheduler = HeapScheduler()
 
         # Tasks
         self._scheduler = HeapScheduler()
 
         # Tasks
@@ -186,22 +184,27 @@ class ExperimentController(object):
 
         # generator of globally unique id for groups
         self._group_id_generator = guid.GuidGenerator()
 
         # generator of globally unique id for groups
         self._group_id_generator = guid.GuidGenerator()
-        # Event processing thread
-        self._cond = threading.Condition()
-        self._thread = threading.Thread(target = self._process)
-        self._thread.setDaemon(True)
-        self._thread.start()
 
         # Flag to stop processing thread
         self._stop = False
     
         # Entity in charge of managing system failures
 
         # Flag to stop processing thread
         self._stop = False
     
         # Entity in charge of managing system failures
-        self._fm = FailureManager()
+        self._fm = FailureManager(self)
 
         # EC state
         self._state = ECState.RUNNING
 
 
         # EC state
         self._state = ECState.RUNNING
 
+        # The runner is a pool of threads used to parallelize 
+        # execution of tasks
+        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
+        self._runner = ParallelRun(maxthreads = nthreads)
+
+        # Event processing thread
+        self._cond = threading.Condition()
+        self._thread = threading.Thread(target = self._process)
+        self._thread.setDaemon(True)
+        self._thread.start()
+
     @property
     def logger(self):
         """ Return the logger of the Experiment Controller
     @property
     def logger(self):
         """ Return the logger of the Experiment Controller
@@ -234,9 +237,6 @@ class ExperimentController(object):
     def abort(self):
         return self._fm.abort
 
     def abort(self):
         return self._fm.abort
 
-    def set_rm_failure(self):
-        self._fm.set_rm_failure()
-
     def wait_finished(self, guids):
         """ Blocking method that wait until all RMs in the 'guid' list 
             reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or 
     def wait_finished(self, guids):
         """ Blocking method that wait until all RMs in the 'guid' list 
             reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or 
@@ -316,17 +316,19 @@ class ExperimentController(object):
             # If a guid reached one of the target states, remove it from list
             guid = guids[0]
             rstate = self.state(guid)
             # If a guid reached one of the target states, remove it from list
             guid = guids[0]
             rstate = self.state(guid)
+            
+            hrrstate = ResourceState2str.get(rstate)
+            hrstate = ResourceState2str.get(state)
 
             if rstate >= state:
                 guids.remove(guid)
 
             if rstate >= state:
                 guids.remove(guid)
+                self.logger.debug(" guid %d DONE - state is %s, required is >= %s " % (
+                    guid, hrrstate, hrstate))
             else:
                 # Debug...
             else:
                 # Debug...
-                hrrstate = ResourceState2str.get(rstate)
-                hrstate = ResourceState2str.get(state)
                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
                     guid, hrrstate, hrstate))
                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
                     guid, hrrstate, hrstate))
-
-            time.sleep(0.5)
+                time.sleep(0.5)
   
     def get_task(self, tid):
         """ Get a specific task
   
     def get_task(self, tid):
         """ Get a specific task
@@ -694,8 +696,10 @@ class ExperimentController(object):
             guids = self.resources
 
         # Remove all pending tasks from the scheduler queue
             guids = self.resources
 
         # Remove all pending tasks from the scheduler queue
-        for tis in self._scheduler.pending:
-            self._scheduler.remove(tis)
+        for tid in list(self._scheduler.pending):
+            self._scheduler.remove(tid)
+
+        self._runner.empty()
 
         for guid in guids:
             rm = self.get_resource(guid)
 
         for guid in guids:
             rm = self.get_resource(guid)
@@ -708,6 +712,10 @@ class ExperimentController(object):
         Releases all the resources and stops task processing thread
 
         """
         Releases all the resources and stops task processing thread
 
         """
+        # If there was a major failure we can't exit gracefully
+        if self._state == ECState.FAILED:
+            raise RuntimeError("EC failure. Can not exit gracefully")
+
         self.release()
 
         # Mark the EC state as TERMINATED
         self.release()
 
         # Mark the EC state as TERMINATED
@@ -788,10 +796,8 @@ class ExperimentController(object):
         that might have been raised by the workers.
 
         """
         that might have been raised by the workers.
 
         """
-        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
 
 
-        runner = ParallelRun(maxthreads = nthreads)
-        runner.start()
+        self._runner.start()
 
         while not self._stop:
             try:
 
         while not self._stop:
             try:
@@ -822,7 +828,7 @@ class ExperimentController(object):
 
                 if task:
                     # Process tasks in parallel
 
                 if task:
                     # Process tasks in parallel
-                    runner.put(self._execute, task)
+                    self._runner.put(self._execute, task)
             except: 
                 import traceback
                 err = traceback.format_exc()
             except: 
                 import traceback
                 err = traceback.format_exc()
@@ -830,13 +836,14 @@ class ExperimentController(object):
 
                 # Set the EC to FAILED state 
                 self._state = ECState.FAILED
 
                 # Set the EC to FAILED state 
                 self._state = ECState.FAILED
-
-                # Set the FailureManager failure level
+            
+                # Set the FailureManager failure level to EC failure
                 self._fm.set_ec_failure()
 
         self.logger.debug("Exiting the task processing loop ... ")
                 self._fm.set_ec_failure()
 
         self.logger.debug("Exiting the task processing loop ... ")
-        runner.sync()
-        runner.destroy()
+        
+        self._runner.sync()
+        self._runner.destroy()
 
     def _execute(self, task):
         """ Executes a single task. 
 
     def _execute(self, task):
         """ Executes a single task. 
@@ -864,9 +871,6 @@ class ExperimentController(object):
             
             self.logger.error("Error occurred while executing task: %s" % err)
 
             
             self.logger.error("Error occurred while executing task: %s" % err)
 
-            # Set the FailureManager failure level
-            self._fm.set_task_failure()
-
     def _notify(self):
         """ Awakes the processing thread in case it is blocked waiting
         for a new task to be scheduled.
     def _notify(self):
         """ Awakes the processing thread in case it is blocked waiting
         for a new task to be scheduled.
index 18edeb0..b4994e0 100644 (file)
@@ -19,6 +19,7 @@
 
 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
 from nepi.util.logger import Logger
 
 from nepi.util.timefuncs import tnow, tdiff, tdiffsec, stabsformat
 from nepi.util.logger import Logger
+from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import TraceAttr
 
 import copy
 from nepi.execution.trace import TraceAttr
 
 import copy
@@ -80,6 +81,20 @@ def clsinit_copy(cls):
     cls._clsinit_copy()
     return cls
 
     cls._clsinit_copy()
     return cls
 
+def failtrap(func):
+    def wrapped(self, *args, **kwargs):
+        try:
+            return func(self, *args, **kwargs)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
+            self.debug("SETTING guid %d to state FAILED" % self.guid)
+            self.fail()
+            raise
+    
+    return wrapped
+
 # Decorator to invoke class initialization method
 @clsinit
 class ResourceManager(Logger):
 # Decorator to invoke class initialization method
 @clsinit
 class ResourceManager(Logger):
@@ -138,8 +153,14 @@ class ResourceManager(Logger):
         resource attributes
 
         """
         resource attributes
 
         """
-        pass
+        critical = Attribute("critical", "Defines whether the resource is critical. "
+                " A failure on a critical resource will interrupt the experiment. ",
+                type = Types.Bool,
+                default = True,
+                flags = Flags.ExecReadOnly)
 
 
+        cls._register_attribute(critical)
+        
     @classmethod
     def _register_traces(cls):
         """ Resource subclasses will invoke this method to register
     @classmethod
     def _register_traces(cls):
         """ Resource subclasses will invoke this method to register
@@ -309,7 +330,7 @@ class ResourceManager(Logger):
 
     @property
     def state(self):
 
     @property
     def state(self):
-        """ Get the state of the current RM """
+        """ Get the current state of the RM """
         return self._state
 
     def log_message(self, msg):
         return self._state
 
     def log_message(self, msg):
@@ -344,27 +365,42 @@ class ResourceManager(Logger):
     def discover(self):
         """ Performs resource discovery.
 
     def discover(self):
         """ Performs resource discovery.
 
-        This  method is resposible for selecting an individual resource
+        This  method is responsible for selecting an individual resource
         matching user requirements.
         This method should be redefined when necessary in child classes.
         matching user requirements.
         This method should be redefined when necessary in child classes.
+
+        If overridden in child classes, make sure to use the failtrap 
+        decorator to ensure the RM state will be set to FAILED in the event 
+        of an exception.
+
         """
         self.set_discovered()
 
     def provision(self):
         """ Performs resource provisioning.
 
         """
         self.set_discovered()
 
     def provision(self):
         """ Performs resource provisioning.
 
-        This  method is resposible for provisioning one resource.
+        This  method is responsible for provisioning one resource.
         After this method has been successfully invoked, the resource
         After this method has been successfully invoked, the resource
-        should be acccesible/controllable by the RM.
+        should be accessible/controllable by the RM.
         This method should be redefined when necessary in child classes.
         This method should be redefined when necessary in child classes.
+
+        If overridden in child classes, make sure to use the failtrap 
+        decorator to ensure the RM state will be set to FAILED in the event 
+        of an exception.
+
         """
         self.set_provisioned()
 
     def start(self):
         """
         self.set_provisioned()
 
     def start(self):
-        """ Starts the resource.
+        """ Starts the RM.
         
         There is no generic start behavior for all resources.
         This method should be redefined when necessary in child classes.
         
         There is no generic start behavior for all resources.
         This method should be redefined when necessary in child classes.
+
+        If overridden in child classes, make sure to use the failtrap 
+        decorator to ensure the RM state will be set to FAILED in the event 
+        of an exception.
+
         """
         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
             self.error("Wrong state %s for start" % self.state)
         """
         if not self.state in [ResourceState.READY, ResourceState.STOPPED]:
             self.error("Wrong state %s for start" % self.state)
@@ -373,10 +409,15 @@ class ResourceManager(Logger):
         self.set_started()
 
     def stop(self):
         self.set_started()
 
     def stop(self):
-        """ Stops the resource.
+        """ Interrupts the RM, stopping any tasks the RM was performing.
         
         There is no generic stop behavior for all resources.
         This method should be redefined when necessary in child classes.
         
         There is no generic stop behavior for all resources.
         This method should be redefined when necessary in child classes.
+
+        If overridden in child classes, make sure to use the failtrap 
+        decorator to ensure the RM state will be set to FAILED in the event 
+        of an exception.
+
         """
         if not self.state in [ResourceState.STARTED]:
             self.error("Wrong state %s for stop" % self.state)
         """
         if not self.state in [ResourceState.STARTED]:
             self.error("Wrong state %s for stop" % self.state)
@@ -385,7 +426,15 @@ class ResourceManager(Logger):
         self.set_stopped()
 
     def deploy(self):
         self.set_stopped()
 
     def deploy(self):
-        """ Execute all steps required for the RM to reach the state READY
+        """ Execute all steps required for the RM to reach the state READY.
+
+        This  method is responsible for deploying the resource (and invoking the
+        discover and provision methods).
+        This method should be redefined when necessary in child classes.
+
+        If overridden in child classes, make sure to use the failtrap 
+        decorator to ensure the RM state will be set to FAILED in the event 
+        of an exception.
 
         """
         if self.state > ResourceState.READY:
 
         """
         if self.state > ResourceState.READY:
@@ -396,14 +445,41 @@ class ResourceManager(Logger):
         self.set_ready()
 
     def release(self):
         self.set_ready()
 
     def release(self):
+        """ Perform actions to free resources used by the RM.
+        
+        This  method is responsible for releasing resources that were
+        used during the experiment by the RM.
+        This method should be redefined when necessary in child classes.
+
+        If overridden in child classes, this method should never
+        raise an error and it must ensure the RM is set to state RELEASED.
+
+        """
         self.set_released()
 
     def finish(self):
         self.set_released()
 
     def finish(self):
+        """ Sets the RM to state FINISHED. 
+        
+        The FINISHED state is different from STOPPED in that it should not be 
+        directly invoked by the user.
+        STOPPED indicates that the user interrupted the RM, FINISHED means
+        that the RM concluded normally the actions it was supposed to perform.
+        This method should be redefined when necessary in child classes.
+        
+        If overridden in child classes, make sure to use the failtrap 
+        decorator to ensure the RM state will be set to FAILED in the event 
+        of an exception.
+
+        """
+
         self.set_finished()
  
     def fail(self):
         self.set_finished()
  
     def fail(self):
+        """ Sets the RM to state FAILED.
+
+        """
+
         self.set_failed()
         self.set_failed()
-        self.ec.set_rm_failure()
 
     def set(self, name, value):
         """ Set the value of the attribute
 
     def set(self, name, value):
         """ Set the value of the attribute
@@ -641,9 +717,11 @@ class ResourceManager(Logger):
         reschedule = False
         delay = reschedule_delay 
 
         reschedule = False
         delay = reschedule_delay 
 
-        ## evaluate if set conditions are met
+        ## evaluate if conditions to start are met
+        if self.ec.abort:
+            return 
 
 
-        # only can start when RM is either STOPPED or READY
+        # Can only start when RM is either STOPPED or READY
         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
             reschedule = True
             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
         if self.state not in [ResourceState.STOPPED, ResourceState.READY]:
             reschedule = True
             self.debug("---- RESCHEDULING START ---- state %s " % self.state )
@@ -680,11 +758,14 @@ class ResourceManager(Logger):
         reschedule = False
         delay = reschedule_delay 
 
         reschedule = False
         delay = reschedule_delay 
 
-        ## evaluate if set conditions are met
+        ## evaluate if conditions to stop are met
+        if self.ec.abort:
+            return 
 
         # only can stop when RM is STARTED
         if self.state != ResourceState.STARTED:
             reschedule = True
 
         # only can stop when RM is STARTED
         if self.state != ResourceState.STARTED:
             reschedule = True
+            self.debug("---- RESCHEDULING STOP ---- state %s " % self.state )
         else:
             self.debug(" ---- STOP CONDITIONS ---- %s" % 
                     self.conditions.get(ResourceAction.STOP))
         else:
             self.debug(" ---- STOP CONDITIONS ---- %s" % 
                     self.conditions.get(ResourceAction.STOP))
@@ -710,7 +791,9 @@ class ResourceManager(Logger):
         reschedule = False
         delay = reschedule_delay 
 
         reschedule = False
         delay = reschedule_delay 
 
-        ## evaluate if set conditions are met
+        ## evaluate if conditions to deploy are met
+        if self.ec.abort:
+            return 
 
         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, 
 
         # only can deploy when RM is either NEW, DISCOVERED or PROVISIONED 
         if self.state not in [ResourceState.NEW, ResourceState.DISCOVERED, 
@@ -769,43 +852,43 @@ class ResourceManager(Logger):
     
     def set_started(self):
         """ Mark ResourceManager as STARTED """
     
     def set_started(self):
         """ Mark ResourceManager as STARTED """
-        self._start_time = tnow()
-        self._state = ResourceState.STARTED
+        self.set_state(ResourceState.STARTED, "_start_time")
         
     def set_stopped(self):
         """ Mark ResourceManager as STOPPED """
         
     def set_stopped(self):
         """ Mark ResourceManager as STOPPED """
-        self._stop_time = tnow()
-        self._state = ResourceState.STOPPED
+        self.set_state(ResourceState.STOPPED, "_stop_time")
 
     def set_ready(self):
         """ Mark ResourceManager as READY """
 
     def set_ready(self):
         """ Mark ResourceManager as READY """
-        self._ready_time = tnow()
-        self._state = ResourceState.READY
+        self.set_state(ResourceState.READY, "_ready_time")
 
     def set_released(self):
         """ Mark ResourceManager as REALEASED """
 
     def set_released(self):
         """ Mark ResourceManager as REALEASED """
-        self._release_time = tnow()
-        self._state = ResourceState.RELEASED
+        self.set_state(ResourceState.RELEASED, "_release_time")
 
     def set_finished(self):
         """ Mark ResourceManager as FINISHED """
 
     def set_finished(self):
         """ Mark ResourceManager as FINISHED """
-        self._finish_time = tnow()
-        self._state = ResourceState.FINISHED
+        self.set_state(ResourceState.FINISHED, "_finish_time")
 
     def set_failed(self):
         """ Mark ResourceManager as FAILED """
 
     def set_failed(self):
         """ Mark ResourceManager as FAILED """
-        self._failed_time = tnow()
-        self._state = ResourceState.FAILED
+        self.set_state(ResourceState.FAILED, "_failed_time")
 
     def set_discovered(self):
         """ Mark ResourceManager as DISCOVERED """
 
     def set_discovered(self):
         """ Mark ResourceManager as DISCOVERED """
-        self._discover_time = tnow()
-        self._state = ResourceState.DISCOVERED
+        self.set_state(ResourceState.DISCOVERED, "_discover_time")
 
     def set_provisioned(self):
         """ Mark ResourceManager as PROVISIONED """
 
     def set_provisioned(self):
         """ Mark ResourceManager as PROVISIONED """
-        self._provision_time = tnow()
-        self._state = ResourceState.PROVISIONED
+        self.set_state(ResourceState.PROVISIONED, "_provision_time")
+
+    def set_state(self, state, state_time_attr):
+        # Ensure that RM state will not change after released
+        if self._state == ResourceState.RELEASED:
+            return 
+   
+        setattr(self, state_time_attr, tnow())
+        self._state = state
 
 class ResourceFactory(object):
     _resource_types = dict()
 
 class ResourceFactory(object):
     _resource_types = dict()
index 812a939..864750e 100644 (file)
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-        ResourceAction
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, ResourceAction, failtrap
 from nepi.util.sshfuncs import ProcStatus
 
 import os
 import tempfile
 
 from nepi.util.sshfuncs import ProcStatus
 
 import os
 import tempfile
 
-@clsinit
+@clsinit_copy
 class Collector(ResourceManager):
     """ The collector is reponsible of collecting traces
     of a same type associated to RMs into a local directory.
 class Collector(ResourceManager):
     """ The collector is reponsible of collecting traces
     of a same type associated to RMs into a local directory.
@@ -69,7 +69,8 @@ class Collector(ResourceManager):
     @property
     def store_path(self):
         return self._store_path
     @property
     def store_path(self):
         return self._store_path
-    
+   
+    @failtrap
     def provision(self):
         trace_name = self.get("traceName")
         if not trace_name:
     def provision(self):
         trace_name = self.get("traceName")
         if not trace_name:
@@ -97,38 +98,40 @@ class Collector(ResourceManager):
 
         super(Collector, self).provision()
 
 
         super(Collector, self).provision()
 
+    @failtrap
     def deploy(self):
     def deploy(self):
-        try:
-            self.discover()
-            self.provision()
-        except:
-            self.fail()
-            raise
+        self.discover()
+        self.provision()
 
         super(Collector, self).deploy()
 
     def release(self):
 
         super(Collector, self).deploy()
 
     def release(self):
-        trace_name = self.get("traceName")
-        rename = self.get("rename") or trace_name
-
-        msg = "Collecting '%s' traces to local directory %s" % (
-            trace_name, self.store_path)
-        self.info(msg)
-
-        rms = self.get_connected()
-        for rm in rms:
-            result = self.ec.trace(rm.guid, trace_name)
-            fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, 
-                rename))
-            try:
-                f = open(fpath, "w")
-                f.write(result)
-                f.close()
-            except:
-                msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name, 
-                        rm.guid, fpath)
-                self.error(msg)
-                continue
+        try:
+            trace_name = self.get("traceName")
+            rename = self.get("rename") or trace_name
+
+            msg = "Collecting '%s' traces to local directory %s" % (
+                trace_name, self.store_path)
+            self.info(msg)
+
+            rms = self.get_connected()
+            for rm in rms:
+                result = self.ec.trace(rm.guid, trace_name)
+                fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid, 
+                    rename))
+                try:
+                    f = open(fpath, "w")
+                    f.write(result)
+                    f.close()
+                except:
+                    msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name, 
+                            rm.guid, fpath)
+                    self.error(msg)
+                    continue
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(Collector, self).release()
 
 
         super(Collector, self).release()
 
index 763106c..1848e9a 100644 (file)
@@ -19,8 +19,8 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux.node import LinuxNode
 from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import tnow, tdiffsec
 from nepi.resources.linux.node import LinuxNode
 from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -31,7 +31,7 @@ import subprocess
 # TODO: Resolve wildcards in commands!!
 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
 
 # TODO: Resolve wildcards in commands!!
 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
 
-@clsinit
+@clsinit_copy
 class LinuxApplication(ResourceManager):
     """
     .. class:: Class Args :
 class LinuxApplication(ResourceManager):
     """
     .. class:: Class Args :
@@ -85,7 +85,6 @@ class LinuxApplication(ResourceManager):
     _help = "Runs an application on a Linux host with a BASH command "
     _backend_type = "linux"
 
     _help = "Runs an application on a Linux host with a BASH command "
     _backend_type = "linux"
 
-
     @classmethod
     def _register_attributes(cls):
         command = Attribute("command", "Command to execute at application start. "
     @classmethod
     def _register_attributes(cls):
         command = Attribute("command", "Command to execute at application start. "
@@ -270,7 +269,8 @@ class LinuxApplication(ResourceManager):
             out = int(out.strip())
 
         return out
             out = int(out.strip())
 
         return out
-            
+
+    @failtrap
     def provision(self):
         # create run dir for application
         self.node.mkdir(self.run_home)
     def provision(self):
         # create run dir for application
         self.node.mkdir(self.run_home)
@@ -302,7 +302,8 @@ class LinuxApplication(ResourceManager):
         # each step we check that the EC is still 
         for step in steps:
             if self.ec.abort:
         # each step we check that the EC is still 
         for step in steps:
             if self.ec.abort:
-                raise RuntimeError, "EC finished"
+                self.debug("Interrupting provisioning. EC says 'ABORT")
+                return
             
             ret = step()
             if ret:
             
             ret = step()
             if ret:
@@ -470,6 +471,7 @@ class LinuxApplication(ResourceManager):
             # replace application specific paths in the command
             return self.replace_paths(install)
 
             # replace application specific paths in the command
             return self.replace_paths(install)
 
+    @failtrap
     def deploy(self):
         # Wait until node is associated and deployed
         node = self.node
     def deploy(self):
         # Wait until node is associated and deployed
         node = self.node
@@ -477,17 +479,14 @@ class LinuxApplication(ResourceManager):
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                command = self.get("command") or ""
-                self.info("Deploying command '%s' " % command)
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return
+            command = self.get("command") or ""
+            self.info("Deploying command '%s' " % command)
+            self.discover()
+            self.provision()
 
             super(LinuxApplication, self).deploy()
 
             super(LinuxApplication, self).deploy()
-    
+   
+    @failtrap
     def start(self):
         command = self.get("command")
 
     def start(self):
         command = self.get("command")
 
@@ -498,15 +497,10 @@ class LinuxApplication(ResourceManager):
             # installation), then the application is directly marked as FINISHED
             self.set_finished()
         else:
             # installation), then the application is directly marked as FINISHED
             self.set_finished()
         else:
-
-            try:
-                if self.in_foreground:
-                    self._run_in_foreground()
-                else:
-                    self._run_in_background()
-            except:
-                self.fail()
-                return
+            if self.in_foreground:
+                self._run_in_foreground()
+            else:
+                self._run_in_background()
 
             super(LinuxApplication, self).start()
 
 
             super(LinuxApplication, self).start()
 
@@ -514,6 +508,7 @@ class LinuxApplication(ResourceManager):
         command = self.get("command")
         sudo = self.get("sudo") or False
         x11 = self.get("forwardX11")
         command = self.get("command")
         sudo = self.get("sudo") or False
         x11 = self.get("forwardX11")
+        env = self.get("env")
 
         # For a command being executed in foreground, if there is stdin,
         # it is expected to be text string not a file or pipe
 
         # For a command being executed in foreground, if there is stdin,
         # it is expected to be text string not a file or pipe
@@ -526,7 +521,7 @@ class LinuxApplication(ResourceManager):
         # to be able to kill the process from the stop method.
         # We also set blocking = False, since we don't want the
         # thread to block until the execution finishes.
         # to be able to kill the process from the stop method.
         # We also set blocking = False, since we don't want the
         # thread to block until the execution finishes.
-        (out, err), self._proc = self.execute_command(self, command, 
+        (out, err), self._proc = self.execute_command(command, 
                 env = env,
                 sudo = sudo,
                 stdin = stdin,
                 env = env,
                 sudo = sudo,
                 stdin = stdin,
@@ -582,7 +577,8 @@ class LinuxApplication(ResourceManager):
                 msg = " Failed to start command '%s' " % command
                 self.error(msg, out, err)
                 raise RuntimeError, msg
                 msg = " Failed to start command '%s' " % command
                 self.error(msg, out, err)
                 raise RuntimeError, msg
-        
+    
+    @failtrap
     def stop(self):
         """ Stops application execution
         """
     def stop(self):
         """ Stops application execution
         """
@@ -609,22 +605,25 @@ class LinuxApplication(ResourceManager):
                     if proc.poll() or err:
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
                     if proc.poll() or err:
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
-                        self.fail()
-                        return
         
             super(LinuxApplication, self).stop()
 
     def release(self):
         self.info("Releasing resource")
 
         
             super(LinuxApplication, self).stop()
 
     def release(self):
         self.info("Releasing resource")
 
-        tear_down = self.get("tearDown")
-        if tear_down:
-            self.node.execute(tear_down)
+        try:
+            tear_down = self.get("tearDown")
+            if tear_down:
+                self.node.execute(tear_down)
 
 
-        self.stop()
+            self.stop()
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(LinuxApplication, self).release()
 
         super(LinuxApplication, self).release()
-   
+        
     @property
     def state(self):
         """ Returns the state of the application
     @property
     def state(self):
         """ Returns the state of the application
index 46a3cc4..c13c092 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.resource import clsinit_copy, ResourceState, \
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-    reschedule_delay
+    reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
 from nepi.util.timefuncs import tnow, tdiffsec
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -44,25 +44,22 @@ class LinuxCCNApplication(LinuxApplication):
         if self.ccnd: return self.ccnd.node
         return None
 
         if self.ccnd: return self.ccnd.node
         return None
 
+    @failtrap
     def deploy(self):
         if not self.ccnd or self.ccnd.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
     def deploy(self):
         if not self.ccnd or self.ccnd.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                command = self.get("command") or ""
+            command = self.get("command") or ""
 
 
-                self.info("Deploying command '%s' " % command)
-                
-                if not self.get("env"):
-                    self.set("env", self._environment)
+            self.info("Deploying command '%s' " % command)
+            
+            if not self.get("env"):
+                self.set("env", self._environment)
+
+            self.discover()
+            self.provision()
 
 
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return
             self.debug("----- READY ---- ")
             self.set_ready()
 
             self.debug("----- READY ---- ")
             self.set_ready()
 
index 1fa93cc..bb2ae46 100644 (file)
@@ -19,7 +19,7 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import clsinit_copy, ResourceState, \
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-    ResourceAction, reschedule_delay
+    ResourceAction, reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnr import LinuxCCNR
 from nepi.util.timefuncs import tnow
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnr import LinuxCCNR
 from nepi.util.timefuncs import tnow
@@ -72,6 +72,7 @@ class LinuxCCNContent(LinuxApplication):
         if self.ccnr: return self.ccnr.node
         return None
 
         if self.ccnr: return self.ccnr.node
         return None
 
+    @failtrap
     def deploy(self):
         if not self.ccnr or self.ccnr.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
     def deploy(self):
         if not self.ccnr or self.ccnr.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
@@ -79,26 +80,22 @@ class LinuxCCNContent(LinuxApplication):
             # ccnr needs to wait until ccnd is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
             # ccnr needs to wait until ccnd is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                if not self.get("command"):
-                    self.set("command", self._start_command)
+            if not self.get("command"):
+                self.set("command", self._start_command)
 
 
-                if not self.get("env"):
-                    self.set("env", self._environment)
+            if not self.get("env"):
+                self.set("env", self._environment)
 
 
-                # set content to stdin, so the content will be
-                # uploaded during provision
-                self.set("stdin", self.get("content"))
+            # set content to stdin, so the content will be
+            # uploaded during provision
+            self.set("stdin", self.get("content"))
 
 
-                command = self.get("command")
+            command = self.get("command")
 
 
-                self.info("Deploying command '%s' " % command)
+            self.info("Deploying command '%s' " % command)
 
 
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return 
+            self.discover()
+            self.provision()
 
             self.debug("----- READY ---- ")
             self.set_ready()
 
             self.debug("----- READY ---- ")
             self.set_ready()
@@ -125,6 +122,7 @@ class LinuxCCNContent(LinuxApplication):
             self.error(msg, out, err)
             raise RuntimeError, msg
 
             self.error(msg, out, err)
             raise RuntimeError, msg
 
+    @failtrap
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
@@ -134,7 +132,7 @@ class LinuxCCNContent(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            sef.fail()
+            raise RuntimeError, msg
 
     @property
     def _start_command(self):
 
     @property
     def _start_command(self):
index 71eb12a..0962936 100644 (file)
@@ -20,7 +20,7 @@
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit_copy, \
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit_copy, \
-        ResourceState, reschedule_delay
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.node import OSType
 from nepi.util.timefuncs import tnow, tdiffsec
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.node import OSType
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -136,6 +136,7 @@ class LinuxCCND(LinuxApplication):
     def path(self):
         return "PATH=$PATH:${BIN}/%s/" % self.version 
 
     def path(self):
         return "PATH=$PATH:${BIN}/%s/" % self.version 
 
+    @failtrap
     def deploy(self):
         if not self.node or self.node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
     def deploy(self):
         if not self.node or self.node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
@@ -143,42 +144,38 @@ class LinuxCCND(LinuxApplication):
             # ccnd needs to wait until node is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
             # ccnd needs to wait until node is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                if not self.get("command"):
-                    self.set("command", self._start_command)
-                
-                if not self.get("depends"):
-                    self.set("depends", self._dependencies)
+            if not self.get("command"):
+                self.set("command", self._start_command)
+            
+            if not self.get("depends"):
+                self.set("depends", self._dependencies)
 
 
-                if not self.get("sources"):
-                    self.set("sources", self._sources)
+            if not self.get("sources"):
+                self.set("sources", self._sources)
 
 
-                sources = self.get("sources")
-                source = sources.split(" ")[0]
-                basename = os.path.basename(source)
-                self._version = ( basename.strip().replace(".tar.gz", "")
-                        .replace(".tar","")
-                        .replace(".gz","")
-                        .replace(".zip","") )
+            sources = self.get("sources")
+            source = sources.split(" ")[0]
+            basename = os.path.basename(source)
+            self._version = ( basename.strip().replace(".tar.gz", "")
+                    .replace(".tar","")
+                    .replace(".gz","")
+                    .replace(".zip","") )
 
 
-                if not self.get("build"):
-                    self.set("build", self._build)
+            if not self.get("build"):
+                self.set("build", self._build)
 
 
-                if not self.get("install"):
-                    self.set("install", self._install)
+            if not self.get("install"):
+                self.set("install", self._install)
 
 
-                if not self.get("env"):
-                    self.set("env", self._environment)
+            if not self.get("env"):
+                self.set("env", self._environment)
 
 
-                command = self.get("command")
+            command = self.get("command")
 
 
-                self.info("Deploying command '%s' " % command)
+            self.info("Deploying command '%s' " % command)
 
 
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return
+            self.discover()
+            self.provision()
 
             self.debug("----- READY ---- ")
             self.set_ready()
 
             self.debug("----- READY ---- ")
             self.set_ready()
@@ -202,6 +199,7 @@ class LinuxCCND(LinuxApplication):
                 env = env,
                 raise_on_error = True)
 
                 env = env,
                 raise_on_error = True)
 
+    @failtrap
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
@@ -211,8 +209,9 @@ class LinuxCCND(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self.fail()
+            raise RuntimeError, msg
 
 
+    @failtrap
     def stop(self):
         command = self.get('command') or ''
         
     def stop(self):
         command = self.get('command') or ''
         
@@ -245,7 +244,7 @@ class LinuxCCND(LinuxApplication):
         state_check_delay = 0.5
         if self._state == ResourceState.STARTED and \
                 tdiffsec(tnow(), self._last_state_check) > state_check_delay:
         state_check_delay = 0.5
         if self._state == ResourceState.STARTED and \
                 tdiffsec(tnow(), self._last_state_check) > state_check_delay:
-            (out, err), proc = self._ccndstatus
+            (out, err), proc = self._ccndstatus()
 
             retcode = proc.poll()
 
 
             retcode = proc.poll()
 
@@ -262,7 +261,6 @@ class LinuxCCND(LinuxApplication):
 
         return self._state
 
 
         return self._state
 
-    @property
     def _ccndstatus(self):
         env = self.get('env') or ""
         environ = self.node.format_environment(env, inline = True)
     def _ccndstatus(self):
         env = self.get('env') or ""
         environ = self.node.format_environment(env, inline = True)
index 7821597..d828950 100644 (file)
@@ -18,8 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-    reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux.ccn.ccnpingserver import LinuxCCNPingServer
 from nepi.util.timefuncs import tnow, tdiffsec
 
 from nepi.resources.linux.ccn.ccnpingserver import LinuxCCNPingServer
 from nepi.util.timefuncs import tnow, tdiffsec
 
@@ -65,6 +65,7 @@ class LinuxCCNPing(LinuxCCNPingServer):
         if ccnpingserver: return ccnpingserver[0]
         return None
 
         if ccnpingserver: return ccnpingserver[0]
         return None
 
+    @failtrap
     def start(self):
         if not self.ccnpingserver or \
                 self.ccnpingserver.state < ResourceState.STARTED:
     def start(self):
         if not self.ccnpingserver or \
                 self.ccnpingserver.state < ResourceState.STARTED:
index b566c78..6e87b27 100644 (file)
@@ -18,8 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-    reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
 from nepi.util.timefuncs import tnow, tdiffsec
 
 from nepi.resources.linux.ccn.ccnapplication import LinuxCCNApplication
 from nepi.util.timefuncs import tnow, tdiffsec
 
@@ -54,6 +54,7 @@ class LinuxCCNPingServer(LinuxCCNApplication):
         super(LinuxCCNPingServer, self).__init__(ec, guid)
         self._home = "ccnping-serv-%s" % self.guid
 
         super(LinuxCCNPingServer, self).__init__(ec, guid)
         self._home = "ccnping-serv-%s" % self.guid
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
index 46c3f3b..5319997 100644 (file)
@@ -20,7 +20,7 @@
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import clsinit_copy, ResourceState, \
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-    ResourceAction, reschedule_delay
+    ResourceAction, reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
 from nepi.util.timefuncs import tnow
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
 from nepi.util.timefuncs import tnow
@@ -200,6 +200,7 @@ class LinuxCCNR(LinuxApplication):
         if self.ccnd: return self.ccnd.node
         return None
 
         if self.ccnd: return self.ccnd.node
         return None
 
+    @failtrap
     def deploy(self):
         if not self.ccnd or self.ccnd.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state )
     def deploy(self):
         if not self.ccnd or self.ccnd.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- CCND state %s " % self.ccnd.state )
@@ -207,22 +208,18 @@ class LinuxCCNR(LinuxApplication):
             # ccnr needs to wait until ccnd is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
             # ccnr needs to wait until ccnd is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                if not self.get("command"):
-                    self.set("command", self._start_command)
+            if not self.get("command"):
+                self.set("command", self._start_command)
 
 
-                if not self.get("env"):
-                    self.set("env", self._environment)
+            if not self.get("env"):
+                self.set("env", self._environment)
 
 
-                command = self.get("command")
+            command = self.get("command")
 
 
-                self.info("Deploying command '%s' " % command)
+            self.info("Deploying command '%s' " % command)
 
 
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return 
+            self.discover()
+            self.provision()
 
             self.debug("----- READY ---- ")
             self.set_ready()
 
             self.debug("----- READY ---- ")
             self.set_ready()
@@ -255,6 +252,7 @@ class LinuxCCNR(LinuxApplication):
                 env = env,
                 raise_on_error = True)
 
                 env = env,
                 raise_on_error = True)
 
+    @failtrap
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
@@ -264,7 +262,7 @@ class LinuxCCNR(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self.fail()
+            raise RuntimeError, msg
 
     @property
     def _start_command(self):
 
     @property
     def _start_command(self):
index 48c2d6d..490d8f6 100644 (file)
@@ -20,7 +20,7 @@
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import clsinit_copy, ResourceState, \
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-    ResourceAction, reschedule_delay
+    ResourceAction, reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
 from nepi.util.timefuncs import tnow
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.ccn.ccnd import LinuxCCND
 from nepi.util.timefuncs import tnow
@@ -108,35 +108,32 @@ class LinuxFIBEntry(LinuxApplication):
             return self.ec.trace(self._traceroute, "stdout", attr, block, offset)
 
         return super(LinuxFIBEntry, self).trace(name, attr, block, offset)
             return self.ec.trace(self._traceroute, "stdout", attr, block, offset)
 
         return super(LinuxFIBEntry, self).trace(name, attr, block, offset)
-        
+    
+    @failtrap
     def deploy(self):
         # Wait until associated ccnd is provisioned
         if not self.ccnd or self.ccnd.state < ResourceState.READY:
             # ccnr needs to wait until ccnd is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
     def deploy(self):
         # Wait until associated ccnd is provisioned
         if not self.ccnd or self.ccnd.state < ResourceState.READY:
             # ccnr needs to wait until ccnd is deployed and running
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                if not self.get("ip"):
-                    host = self.get("host")
-                    ip = socket.gethostbyname(host)
-                    self.set("ip", ip)
+            if not self.get("ip"):
+                host = self.get("host")
+                ip = socket.gethostbyname(host)
+                self.set("ip", ip)
 
 
-                if not self.get("command"):
-                    self.set("command", self._start_command)
+            if not self.get("command"):
+                self.set("command", self._start_command)
 
 
-                if not self.get("env"):
-                    self.set("env", self._environment)
+            if not self.get("env"):
+                self.set("env", self._environment)
 
 
-                command = self.get("command")
+            command = self.get("command")
 
 
-                self.info("Deploying command '%s' " % command)
+            self.info("Deploying command '%s' " % command)
 
 
-                self.discover()
-                self.provision()
-                self.configure()
-            except:
-                self.fail()
-                return
+            self.discover()
+            self.provision()
+            self.configure()
 
             self.debug("----- READY ---- ")
             self.set_ready()
 
             self.debug("----- READY ---- ")
             self.set_ready()
@@ -194,6 +191,7 @@ class LinuxFIBEntry(LinuxApplication):
             # schedule mtr deploy
             self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
 
             # schedule mtr deploy
             self.ec.deploy(guids=[self._traceroute], group = self.deployment_group)
 
+    @failtrap
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
@@ -203,8 +201,9 @@ class LinuxFIBEntry(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self.fail()
+            raise RuntimeError, msg
 
 
+    @failtrap
     def stop(self):
         command = self.get('command')
         env = self.get('env')
     def stop(self):
         command = self.get('command')
         env = self.get('env')
@@ -221,7 +220,7 @@ class LinuxFIBEntry(LinuxApplication):
             if err:
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
             if err:
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
-                self.fail()
+                raise RuntimeError, msg
 
     @property
     def _start_command(self):
 
     @property
     def _start_command(self):
index 2ed5c01..429051e 100644 (file)
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState
 
 
-@clsinit
+@clsinit_copy
 class LinuxChannel(ResourceManager):
     _rtype = "LinuxChannel"
     _help = "Represents a wireless channel on a network of Linux hosts"
 class LinuxChannel(ResourceManager):
     _rtype = "LinuxChannel"
     _help = "Represents a wireless channel on a network of Linux hosts"
@@ -35,3 +36,4 @@ class LinuxChannel(ResourceManager):
     def valid_connection(self, guid):
         # TODO: Validate!
         return True
     def valid_connection(self, guid):
         # TODO: Validate!
         return True
+
index a170f41..9ccdc4f 100644 (file)
@@ -18,8 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Types, Flags
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Types, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.linux.channel import LinuxChannel
 
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.linux.channel import LinuxChannel
 
@@ -33,7 +33,7 @@ import time
 # TODO: UP, MTU attributes!
 
 
 # TODO: UP, MTU attributes!
 
 
-@clsinit
+@clsinit_copy
 class LinuxInterface(ResourceManager):
     _rtype = "LinuxInterface"
     _help = "Controls network devices on Linux hosts through the ifconfig tool"
 class LinuxInterface(ResourceManager):
     _rtype = "LinuxInterface"
     _help = "Controls network devices on Linux hosts through the ifconfig tool"
@@ -102,6 +102,7 @@ class LinuxInterface(ResourceManager):
         if chan: return chan[0]
         return None
 
         if chan: return chan[0]
         return None
 
+    @failtrap
     def discover(self):
         devname = self.get("deviceName")
         ip4 = self.get("ip4")
     def discover(self):
         devname = self.get("deviceName")
         ip4 = self.get("ip4")
@@ -183,6 +184,7 @@ class LinuxInterface(ResourceManager):
 
         super(LinuxInterface, self).discover()
 
 
         super(LinuxInterface, self).discover()
 
+    @failtrap
     def provision(self):
         devname = self.get("deviceName")
         ip4 = self.get("ip4")
     def provision(self):
         devname = self.get("deviceName")
         ip4 = self.get("ip4")
@@ -226,6 +228,7 @@ class LinuxInterface(ResourceManager):
 
         super(LinuxInterface, self).provision()
 
 
         super(LinuxInterface, self).provision()
 
+    @failtrap
     def deploy(self):
         # Wait until node is provisioned
         node = self.node
     def deploy(self):
         # Wait until node is provisioned
         node = self.node
@@ -238,19 +241,20 @@ class LinuxInterface(ResourceManager):
         else:
             # Verify if the interface exists in node. If not, configue
             # if yes, load existing configuration
         else:
             # Verify if the interface exists in node. If not, configue
             # if yes, load existing configuration
-            try:
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return 
+            self.discover()
+            self.provision()
 
             super(LinuxInterface, self).deploy()
 
     def release(self):
 
             super(LinuxInterface, self).deploy()
 
     def release(self):
-        tear_down = self.get("tearDown")
-        if tear_down:
-            self.execute(tear_down)
+        try:
+            tear_down = self.get("tearDown")
+            if tear_down:   
+                self.execute(tear_down)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(LinuxInterface, self).release()
 
 
         super(LinuxInterface, self).release()
 
index 85a9f25..6085721 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy 
+from nepi.execution.resource import clsinit_copy, failtrap 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
@@ -83,6 +83,7 @@ class LinuxMtr(LinuxApplication):
         self._home = "mtr-%s" % self.guid
         self._sudo_kill = True
 
         self._home = "mtr-%s" % self.guid
         self._sudo_kill = True
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
index 9cab4ca..953793a 100644 (file)
@@ -17,9 +17,9 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-        reschedule_delay
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux import rpmfuncs, debfuncs 
 from nepi.util import sshfuncs, execfuncs
 from nepi.util.sshfuncs import ProcStatus
 from nepi.resources.linux import rpmfuncs, debfuncs 
 from nepi.util import sshfuncs, execfuncs
 from nepi.util.sshfuncs import ProcStatus
@@ -57,7 +57,7 @@ class OSType:
     UBUNTU = "ubuntu"
     DEBIAN = "debian"
 
     UBUNTU = "ubuntu"
     DEBIAN = "debian"
 
-@clsinit
+@clsinit_copy
 class LinuxNode(ResourceManager):
     """
     .. class:: Class Args :
 class LinuxNode(ResourceManager):
     """
     .. class:: Class Args :
@@ -168,14 +168,20 @@ class LinuxNode(ResourceManager):
         
         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
                 " from node home folder before starting experiment", 
         
         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
                 " from node home folder before starting experiment", 
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
 
         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
                 " from a previous same experiment, before the new experiment starts", 
                 flags = Flags.ExecReadOnly)
 
         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
                 " from a previous same experiment, before the new experiment starts", 
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
         
         clean_processes = Attribute("cleanProcesses", 
                 "Kill all running processes before starting experiment",
                 flags = Flags.ExecReadOnly)
         
         clean_processes = Attribute("cleanProcesses", 
                 "Kill all running processes before starting experiment",
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
         
         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
                 flags = Flags.ExecReadOnly)
         
         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
@@ -309,7 +315,6 @@ class LinuxNode(ResourceManager):
             time.sleep(min(30.0, retrydelay))
             retrydelay *= 1.5
 
             time.sleep(min(30.0, retrydelay))
             retrydelay *= 1.5
 
-
     @property
     def use_deb(self):
         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
     @property
     def use_deb(self):
         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
@@ -323,10 +328,10 @@ class LinuxNode(ResourceManager):
     def localhost(self):
         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
 
     def localhost(self):
         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
 
+    @failtrap
     def provision(self):
         # check if host is alive
         if not self.is_alive():
     def provision(self):
         # check if host is alive
         if not self.is_alive():
-            
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
             self.error(msg)
             raise RuntimeError, msg
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
             self.error(msg)
             raise RuntimeError, msg
@@ -353,14 +358,12 @@ class LinuxNode(ResourceManager):
 
         super(LinuxNode, self).provision()
 
 
         super(LinuxNode, self).provision()
 
+    @failtrap
     def deploy(self):
         if self.state == ResourceState.NEW:
     def deploy(self):
         if self.state == ResourceState.NEW:
-            try:
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return
+            self.info("Deploying node")
+            self.discover()
+            self.provision()
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
@@ -374,19 +377,24 @@ class LinuxNode(ResourceManager):
         super(LinuxNode, self).deploy()
 
     def release(self):
         super(LinuxNode, self).deploy()
 
     def release(self):
-        # Node needs to wait until all associated RMs are released
-        # to be released
-        rms = self.get_connected()
-        for rm in rms:
-            if rm.state < ResourceState.STOPPED:
-                self.ec.schedule(reschedule_delay, self.release)
-                return 
-
-        tear_down = self.get("tearDown")
-        if tear_down:
-            self.execute(tear_down)
+        try:
+            rms = self.get_connected()
+            for rm in rms:
+                # Node needs to wait until all associated RMs are released
+                # before it can be released
+                if rm.state < ResourceState.STOPPED:
+                    self.ec.schedule(reschedule_delay, self.release)
+                    return 
+
+            tear_down = self.get("tearDown")
+            if tear_down:
+                self.execute(tear_down)
 
 
-        self.clean_processes()
+            self.clean_processes()
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(LinuxNode, self).release()
 
 
         super(LinuxNode, self).release()
 
@@ -627,7 +635,6 @@ class LinuxNode(ResourceManager):
 
         return (out, err), proc
 
 
         return (out, err), proc
 
-
     def upload(self, src, dst, text = False, overwrite = True):
         """ Copy content to destination
 
     def upload(self, src, dst, text = False, overwrite = True):
         """ Copy content to destination
 
index e0d6452..ec874bc 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy 
+from nepi.execution.resource import clsinit_copy, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
@@ -133,6 +133,7 @@ class LinuxNPing(LinuxApplication):
         self._home = "nping-%s" % self.guid
         self._sudo_kill = True
 
         self._home = "nping-%s" % self.guid
         self._sudo_kill = True
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
index 3447658..d085b11 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy 
+from nepi.execution.resource import clsinit_copy, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
@@ -184,6 +184,7 @@ class LinuxPing(LinuxApplication):
         super(LinuxPing, self).__init__(ec, guid)
         self._home = "ping-%s" % self.guid
 
         super(LinuxPing, self).__init__(ec, guid)
         self._home = "ping-%s" % self.guid
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
index 3fa11f2..e9955f4 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy 
+from nepi.execution.resource import clsinit_copy, failtrap 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
@@ -316,6 +316,7 @@ class LinuxTcpdump(LinuxApplication):
         self._home = "tcpdump-%s" % self.guid
         self._sudo_kill = True
 
         self._home = "tcpdump-%s" % self.guid
         self._sudo_kill = True
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
index 2e03f62..99eea6b 100644 (file)
@@ -18,7 +18,7 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import clsinit_copy 
+from nepi.execution.resource import clsinit_copy, failtrap 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
@@ -64,6 +64,7 @@ class LinuxTraceroute(LinuxApplication):
         super(LinuxTraceroute, self).__init__(ec, guid)
         self._home = "traceroute-%s" % self.guid
 
         super(LinuxTraceroute, self).__init__(ec, guid)
         self._home = "traceroute-%s" % self.guid
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
index 1b7ac9c..6ad0085 100644 (file)
@@ -19,8 +19,7 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import clsinit_copy, ResourceState, \
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-        reschedule_delay
-from nepi.execution.resource import clsinit_copy 
+        reschedule_delay, failtrap 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.timefuncs import tnow
 
@@ -214,6 +213,7 @@ class LinuxUdpTest(LinuxApplication):
         super(LinuxUdpTest, self).__init__(ec, guid)
         self._home = "udptest-%s" % self.guid
 
         super(LinuxUdpTest, self).__init__(ec, guid)
         self._home = "udptest-%s" % self.guid
 
+    @failtrap
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
     def deploy(self):
         if not self.get("command"):
             self.set("command", self._start_command)
@@ -247,6 +247,7 @@ class LinuxUdpTest(LinuxApplication):
             # finished to continue )
             self._run_in_background()
     
             # finished to continue )
             self._run_in_background()
     
+    @failtrap
     def start(self):
         if self.get("s") == True:
             # Server is already running
     def start(self):
         if self.get("s") == True:
             # Server is already running
@@ -258,7 +259,7 @@ class LinuxUdpTest(LinuxApplication):
             else:
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
             else:
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
-                self.fail()
+                raise RuntimeError, err
         else:
             super(LinuxUdpTest, self).start()
  
         else:
             super(LinuxUdpTest, self).start()
  
index 4dae96c..1efe230 100644 (file)
@@ -19,7 +19,7 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import clsinit_copy, ResourceState, \
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-        reschedule_delay
+        reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import tnow, tdiffsec
 from nepi.resources.linux.application import LinuxApplication
 from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -161,6 +161,7 @@ class UdpTunnel(LinuxApplication):
         port = self.wait_local_port(endpoint)
         return (port, pid, ppid)
 
         port = self.wait_local_port(endpoint)
         return (port, pid, ppid)
 
+    @failtrap
     def provision(self):
         # create run dir for tunnel on each node 
         self.endpoint1.node.mkdir(self.run_home(self.endpoint1))
     def provision(self):
         # create run dir for tunnel on each node 
         self.endpoint1.node.mkdir(self.run_home(self.endpoint1))
@@ -190,21 +191,19 @@ class UdpTunnel(LinuxApplication):
  
         self.set_provisioned()
 
  
         self.set_provisioned()
 
+    @failtrap
     def deploy(self):
         if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
             (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
     def deploy(self):
         if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
             (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return
+            self.discover()
+            self.provision()
  
             self.debug("----- READY ---- ")
             self.set_ready()
 
  
             self.debug("----- READY ---- ")
             self.set_ready()
 
+    @failtrap
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
@@ -214,8 +213,9 @@ class UdpTunnel(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self.fail()
+            raise RuntimeError, msg
 
 
+    @failtrap
     def stop(self):
         """ Stops application execution
         """
     def stop(self):
         """ Stops application execution
         """
@@ -234,8 +234,7 @@ class UdpTunnel(LinuxApplication):
                     # check if execution errors occurred
                     msg = " Failed to STOP tunnel"
                     self.error(msg, err1, err2)
                     # check if execution errors occurred
                     msg = " Failed to STOP tunnel"
                     self.error(msg, err1, err2)
-                    self.fail()
-                    return
+                    raise RuntimeError, msg
 
             self.set_stopped()
 
 
             self.set_stopped()
 
index f0fc5f1..54f8854 100644 (file)
@@ -18,8 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.execution.attribute import Attribute, Flags 
 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
 from nepi.resources.omf.node import OMFNode
 from nepi.execution.attribute import Attribute, Flags 
 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
 from nepi.resources.omf.node import OMFNode
@@ -147,6 +147,7 @@ class OMFApplication(OMFResource):
 
             return True
 
 
             return True
 
+    @failtrap
     def deploy(self):
         """ Deploy the RM. It means nothing special for an application 
         for now (later it will be upload sources, ...)
     def deploy(self):
         """ Deploy the RM. It means nothing special for an application 
         for now (later it will be upload sources, ...)
@@ -166,8 +167,7 @@ class OMFApplication(OMFResource):
         if not self._omf_api :
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
         if not self._omf_api :
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
-            return
+            raise RuntimeError, msg
 
         if self.get('sources'):
             gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')]
 
         if self.get('sources'):
             gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')]
@@ -177,7 +177,7 @@ class OMFApplication(OMFResource):
 
         super(OMFApplication, self).deploy()
 
 
         super(OMFApplication, self).deploy()
 
-
+    @failtrap
     def start(self):
         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
          to execute the application. 
     def start(self):
         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
          to execute the application. 
@@ -187,8 +187,7 @@ class OMFApplication(OMFResource):
         if not (self.get('appid') and self.get('path')) :
             msg = "Application's information are not initialized"
             self.error(msg)
         if not (self.get('appid') and self.get('path')) :
             msg = "Application's information are not initialized"
             self.error(msg)
-            self.fail()
-            return
+            raise RuntimeError, msg
 
         if not self.get('args'):
             self.set('args', " ")
 
         if not self.get('args'):
             self.set('args', " ")
@@ -207,11 +206,11 @@ class OMFApplication(OMFResource):
         except AttributeError:
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
         except AttributeError:
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
             raise
 
         super(OMFApplication, self).start()
 
             raise
 
         super(OMFApplication, self).start()
 
+    @failtrap
     def stop(self):
         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
         kill the application. 
     def stop(self):
         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
         kill the application. 
@@ -223,8 +222,7 @@ class OMFApplication(OMFResource):
         except AttributeError:
             msg = "Credentials were not initialzed. XMPP Connections impossible"
             self.error(msg)
         except AttributeError:
             msg = "Credentials were not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
-            return
+            raise
 
         super(OMFApplication, self).stop()
         self.set_finished()
 
         super(OMFApplication, self).stop()
         self.set_finished()
@@ -233,10 +231,15 @@ class OMFApplication(OMFResource):
         """ Clean the RM at the end of the experiment and release the API.
 
         """
         """ Clean the RM at the end of the experiment and release the API.
 
         """
-        if self._omf_api :
-            OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), 
-                self.get('xmppPassword'), exp_id = self.exp_id)
+        try:
+            if self._omf_api :
+                OMFAPIFactory.release_api(self.get('xmppSlice'), 
+                    self.get('xmppHost'), self.get('xmppPort'), 
+                    self.get('xmppPassword'), exp_id = self.exp_id)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(OMFApplication, self).release()
 
 
         super(OMFApplication, self).release()
 
index b5d96a3..0e99567 100644 (file)
@@ -18,8 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.execution.attribute import Attribute, Flags 
 
 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
 from nepi.execution.attribute import Attribute, Flags 
 
 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
@@ -121,18 +121,7 @@ class OMFChannel(OMFResource):
                     res.append(couple)
         return res
 
                     res.append(couple)
         return res
 
-    def discover(self):
-        """ Discover the availables channels
-
-        """
-        pass
-     
-    def provision(self):
-        """ Provision some availables channels
-
-        """
-        pass
-
+    @failtrap
     def deploy(self):
         """ Deploy the RM. It means : Get the xmpp client and send messages 
         using OMF 5.4 protocol to configure the channel.
     def deploy(self):
         """ Deploy the RM. It means : Get the xmpp client and send messages 
         using OMF 5.4 protocol to configure the channel.
@@ -147,58 +136,44 @@ class OMFChannel(OMFResource):
         if not self._omf_api :
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
         if not self._omf_api :
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
-            return
+            raise RuntimeError, msg
 
         if not self.get('channel'):
             msg = "Channel's value is not initialized"
             self.error(msg)
 
         if not self.get('channel'):
             msg = "Channel's value is not initialized"
             self.error(msg)
-            self.fail()
-            return
+            raise RuntimeError, msg
+
+        self._nodes_guid = self._get_target(self._connections)
 
 
-        self._nodes_guid = self._get_target(self._connections) 
         if self._nodes_guid == "reschedule" :
             self.ec.schedule("2s", self.deploy)
         if self._nodes_guid == "reschedule" :
             self.ec.schedule("2s", self.deploy)
-            return False
-
-        try:
-            for couple in self._nodes_guid:
-                #print "Couple node/alias : " + couple[0] + "  ,  " + couple[1]
-                attrval = self.get('channel')
-                attrname = "net/%s/%s" % (couple[1], 'channel')
-                self._omf_api.configure(couple[0], attrname, attrval)
-        except AttributeError:
-            msg = "Credentials are not initialzed. XMPP Connections impossible"
-            self.error(msg)
-            self.fail()
-            return
-
-        super(OMFChannel, self).deploy()
-
-    def start(self):
-        """ Start the RM. It means nothing special for a channel for now
-        It becomes STARTED as soon as this method starts.
-
-        """
-
-        super(OMFChannel, self).start()
-
-    def stop(self):
-        """ Stop the RM. It means nothing special for a channel for now
-        It becomes STOPPED as soon as this method is called
-
-        """
-        super(OMFChannel, self).stop()
-        self.set_finished()
+        else:
+            try:
+                for couple in self._nodes_guid:
+                    #print "Couple node/alias : " + couple[0] + "  ,  " + couple[1]
+                    attrval = self.get('channel')
+                    attrname = "net/%s/%s" % (couple[1], 'channel')
+                    self._omf_api.configure(couple[0], attrname, attrval)
+            except AttributeError:
+                msg = "Credentials are not initialzed. XMPP Connections impossible"
+                self.error(msg)
+                raise
+
+            super(OMFChannel, self).deploy()
 
     def release(self):
         """ Clean the RM at the end of the experiment and release the API
 
         """
 
     def release(self):
         """ Clean the RM at the end of the experiment and release the API
 
         """
-        if self._omf_api :
-            OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), 
-                self.get('xmppPassword'), exp_id = self.exp_id)
+        try:
+            if self._omf_api :
+                OMFAPIFactory.release_api(self.get('xmppSlice'), 
+                    self.get('xmppHost'), self.get('xmppPort'), 
+                    self.get('xmppPassword'), exp_id = self.exp_id)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(OMFChannel, self).release()
 
 
         super(OMFChannel, self).release()
 
index e61ad41..81e3b60 100644 (file)
@@ -18,8 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Julien Tribino <julien.tribino@inria.fr>
 
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.execution.attribute import Attribute, Flags 
 
 from nepi.resources.omf.node import OMFNode
 from nepi.execution.attribute import Attribute, Flags 
 
 from nepi.resources.omf.node import OMFNode
@@ -120,7 +120,6 @@ class OMFWifiInterface(OMFResource):
         if rm_list: return rm_list[0]
         return None
 
         if rm_list: return rm_list[0]
         return None
 
-
     def configure_iface(self):
         """ Configure the interface without the ip
 
     def configure_iface(self):
         """ Configure the interface without the ip
 
@@ -165,6 +164,7 @@ class OMFWifiInterface(OMFResource):
 
         return True
 
 
         return True
 
+    @failtrap
     def deploy(self):
         """ Deploy the RM. It means : Get the xmpp client and send messages 
         using OMF 5.4 protocol to configure the interface.
     def deploy(self):
         """ Deploy the RM. It means : Get the xmpp client and send messages 
         using OMF 5.4 protocol to configure the interface.
@@ -183,21 +183,18 @@ class OMFWifiInterface(OMFResource):
         if not self._omf_api :
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
         if not self._omf_api :
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
-            return
+            raise RuntimeError, msg
 
         if not (self.get('mode') and self.get('type') and self.get('essid') \
                 and self.get('ip')):
             msg = "Interface's variable are not initialized"
             self.error(msg)
 
         if not (self.get('mode') and self.get('type') and self.get('essid') \
                 and self.get('ip')):
             msg = "Interface's variable are not initialized"
             self.error(msg)
-            self.fail()
-            return False
+            raise RuntimeError, msg
 
         if not self.node.get('hostname') :
             msg = "The channel is connected with an undefined node"
             self.error(msg)
 
         if not self.node.get('hostname') :
             msg = "The channel is connected with an undefined node"
             self.error(msg)
-            self.fail()
-            return False
+            raise RuntimeError, msg
 
         # Just for information
         self.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + \
 
         # Just for information
         self.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + \
@@ -205,43 +202,25 @@ class OMFWifiInterface(OMFResource):
             self.get('essid') + " : " + self.get('ip'))
     
         # Check if the node is already deployed
             self.get('essid') + " : " + self.get('ip'))
     
         # Check if the node is already deployed
-        chk1 = True
         if self.state < ResourceState.PROVISIONED:
         if self.state < ResourceState.PROVISIONED:
-            chk1 = self.configure_iface()
-        if chk1:
-            chk2 = self.configure_ip()
+            if self.configure_iface():
+                self.configure_ip()
 
 
-        if not (chk1 and chk2) :
-            return False
-            
         super(OMFWifiInterface, self).deploy()
         super(OMFWifiInterface, self).deploy()
-        return True
-
-
-    def start(self):
-        """ Start the RM. It means nothing special for a channel for now
-        It becomes STARTED as soon as this method starts.
-
-        """
-
-        super(OMFWifiInterface, self).start()
-
-    def stop(self):
-        """ Stop the RM. It means nothing special for a channel for now
-        It becomes STOPPED as soon as this method is called
-
-        """
-        super(OMFWifiInterface, self).stop()
-        self.set_finished()
 
     def release(self):
         """ Clean the RM at the end of the experiment and release the API
 
         """
 
     def release(self):
         """ Clean the RM at the end of the experiment and release the API
 
         """
-        if self._omf_api :
-            OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), 
-                self.get('xmppPassword'), exp_id = self.exp_id)
+        try:
+            if self._omf_api :
+                OMFAPIFactory.release_api(self.get('xmppSlice'), 
+                    self.get('xmppHost'), self.get('xmppPort'), 
+                    self.get('xmppPassword'), exp_id = self.exp_id)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(OMFWifiInterface, self).release()
 
 
         super(OMFWifiInterface, self).release()
 
index 99bedf6..4da85d3 100644 (file)
 #         Julien Tribino <julien.tribino@inria.fr>
 
 
 #         Julien Tribino <julien.tribino@inria.fr>
 
 
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.execution.attribute import Attribute, Flags 
 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
 import time
 
 from nepi.execution.attribute import Attribute, Flags 
 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
 import time
 
-
 @clsinit_copy
 class OMFNode(OMFResource):
     """
 @clsinit_copy
 class OMFNode(OMFResource):
     """
@@ -98,6 +97,7 @@ class OMFNode(OMFResource):
 
         return False
 
 
         return False
 
+    @failtrap
     def deploy(self):
         """ Deploy the RM. It means : Send Xmpp Message Using OMF protocol 
             to enroll the node into the experiment.
     def deploy(self):
         """ Deploy the RM. It means : Send Xmpp Message Using OMF protocol 
             to enroll the node into the experiment.
@@ -112,63 +112,37 @@ class OMFNode(OMFResource):
         if not self._omf_api :
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
         if not self._omf_api :
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
-            return
+            raise RuntimeError, msg
 
         if not self.get('hostname') :
             msg = "Hostname's value is not initialized"
             self.error(msg)
 
         if not self.get('hostname') :
             msg = "Hostname's value is not initialized"
             self.error(msg)
-            self.fail()
-            return False
+            raise RuntimeError, msg
 
         try:
             self._omf_api.enroll_host(self.get('hostname'))
         except AttributeError:
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
 
         try:
             self._omf_api.enroll_host(self.get('hostname'))
         except AttributeError:
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
-            self.fail()
-            #raise AttributeError, msg
+            raise 
 
         super(OMFNode, self).deploy()
 
 
         super(OMFNode, self).deploy()
 
-    def discover(self):
-        """ Discover the availables nodes
-
-        """
-        pass
-     
-    def provision(self):
-        """ Provision some availables nodes
-
-        """
-        pass
-
-    def start(self):
-        """Start the RM. It means nothing special for an interface for now
-           It becomes STARTED as soon as this method starts.
-
-        """
-
-        super(OMFNode, self).start()
-
-    def stop(self):
-        """Stop the RM. It means nothing special for an interface for now
-           It becomes STOPPED as soon as this method stops
-
-        """
-        super(OMFNode, self).stop()
-        self.set_finished()
-
     def release(self):
         """Clean the RM at the end of the experiment
 
         """
     def release(self):
         """Clean the RM at the end of the experiment
 
         """
-        if self._omf_api :
-            self._omf_api.release(self.get('hostname'))
-
-            OMFAPIFactory.release_api(self.get('xmppSlice'), 
-                self.get('xmppHost'), self.get('xmppPort'), 
-                self.get('xmppPassword'), exp_id = self.exp_id)
+        try:
+            if self._omf_api :
+                self._omf_api.release(self.get('hostname'))
+
+                OMFAPIFactory.release_api(self.get('xmppSlice'), 
+                    self.get('xmppHost'), self.get('xmppPort'), 
+                    self.get('xmppPassword'), exp_id = self.exp_id)
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(OMFNode, self).release()
 
 
         super(OMFNode, self).release()
 
index ed2de65..632d201 100644 (file)
@@ -19,8 +19,8 @@
 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay
 
 class ResourceGateway:
     """
 
 class ResourceGateway:
     """
@@ -38,7 +38,7 @@ class ResourceGateway:
         "nicta" : "??.??.??",
     })
 
         "nicta" : "??.??.??",
     })
 
-@clsinit
+@clsinit_copy
 class OMFResource(ResourceManager):
     """
     Generic resource gathering XMPP credential information and common methods
 class OMFResource(ResourceManager):
     """
     Generic resource gathering XMPP credential information and common methods
index b26a4f3..57196b3 100644 (file)
@@ -19,8 +19,8 @@
 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
-        reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay, failtrap
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
 from nepi.util.execfuncs import lexec
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
 from nepi.util.execfuncs import lexec
index bffc61e..51628fd 100644 (file)
@@ -19,7 +19,8 @@
 #         Alexandros Kouvakas <alexandros.kouvakas@inria.fr>
 
 
 #         Alexandros Kouvakas <alexandros.kouvakas@inria.fr>
 
 
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, failtrap
 from nepi.execution.attribute import Attribute, Flags
 from nepi.resources.planetlab.node import PlanetlabNode        
 from nepi.resources.linux.application import LinuxApplication
 from nepi.execution.attribute import Attribute, Flags
 from nepi.resources.planetlab.node import PlanetlabNode        
 from nepi.resources.linux.application import LinuxApplication
@@ -114,6 +115,7 @@ class OVSWitch(LinuxApplication):
         # TODO: Validate!
         return True
 
         # TODO: Validate!
         return True
 
+    @failtrap
     def provision(self):
         # create home dir for ovs
         self.node.mkdir(self.ovs_home)
     def provision(self):
         # create home dir for ovs
         self.node.mkdir(self.ovs_home)
@@ -136,13 +138,16 @@ class OVSWitch(LinuxApplication):
                 stderr = "check_cmd_stderr")
 
         (out, err), proc = self.node.check_output(self.ovs_checks, 'check_cmd_exitcode')
                 stderr = "check_cmd_stderr")
 
         (out, err), proc = self.node.check_output(self.ovs_checks, 'check_cmd_exitcode')
+        
         if out != "0\n":
             msg = "Command sliver-ovs does not exist on the VM"         
             self.debug(msg)
             raise RuntimeError, msg
         if out != "0\n":
             msg = "Command sliver-ovs does not exist on the VM"         
             self.debug(msg)
             raise RuntimeError, msg
+
         msg = "Command sliver-ovs exists" 
         self.debug(msg)                                                
 
         msg = "Command sliver-ovs exists" 
         self.debug(msg)                                                
 
+    @failtrap
     def deploy(self):
         """ Wait until node is associated and deployed
         """
     def deploy(self):
         """ Wait until node is associated and deployed
         """
@@ -152,19 +157,15 @@ class OVSWitch(LinuxApplication):
             self.ec.schedule(reschedule_delay, self.deploy)
 
         else:
             self.ec.schedule(reschedule_delay, self.deploy)
 
         else:
-            try:
-                self.discover()
-                self.provision()
-                self.check_sliver_ovs()
-                self.servers_on()
-                self.create_bridge()
-                self.assign_contr()
-                self.ovs_status()
-            except:
-                self._state = ResourceState.FAILED
-                raise
-                
-            self._state = ResourceState.READY
+            self.discover()
+            self.provision()
+            self.check_sliver_ovs()
+            self.servers_on()
+            self.create_bridge()
+            self.assign_contr()
+            self.ovs_status()
+            
+            super(OVSWitch, self).deploy()
 
     def servers_on(self):
         """ Start the openvswitch servers and also checking 
 
     def servers_on(self):
         """ Start the openvswitch servers and also checking 
@@ -201,9 +202,11 @@ class OVSWitch(LinuxApplication):
 
         # Check if the servers are running or not
         (out, err), proc = self.node.check_output(self.ovs_checks, 'status_srv_exitcode')
 
         # Check if the servers are running or not
         (out, err), proc = self.node.check_output(self.ovs_checks, 'status_srv_exitcode')
+        
         if out != "0\n":
             self.debug("Servers are not running")
             raise RuntimeError, msg
         if out != "0\n":
             self.debug("Servers are not running")
             raise RuntimeError, msg
+        
         self.info("Servers started")  
 
     def del_old_br(self):
         self.info("Servers started")  
 
     def del_old_br(self):
@@ -279,44 +282,37 @@ class OVSWitch(LinuxApplication):
         (out, err), proc = self.node.check_output(self.ovs_home, 'show_stdout')
         self.info(out)
 
         (out, err), proc = self.node.check_output(self.ovs_home, 'show_stdout')
         self.info(out)
 
-    def start(self):
-        """ Start the RM. It means nothing special for 
-            ovswitch for now.  
-        """
-        pass
-
-    def stop(self):
-        """ Stop the RM.It means nothing 
-            for ovswitch for now.
-        """
-        pass
-
     def release(self):
         """ Delete the bridge and 
             close the servers
         """
         # Node needs to wait until all associated RMs are released
         # to be released
     def release(self):
         """ Delete the bridge and 
             close the servers
         """
         # Node needs to wait until all associated RMs are released
         # to be released
-        from nepi.resources.planetlab.openvswitch.ovsport import OVSPort
-        rm = self.get_connected(OVSPort.rtype())
+        try:
+            from nepi.resources.planetlab.openvswitch.ovsport import OVSPort
+            rm = self.get_connected(OVSPort.rtype())
 
 
-        if rm[0].state < ResourceState.FINISHED:
-            self.ec.schedule(reschedule_delay, self.release)
-            return 
+            if rm[0].state < ResourceState.FINISHED:
+                self.ec.schedule(reschedule_delay, self.release)
+                return 
+                
+            msg = "Deleting the bridge %s" % self.get('bridge_name')
+            self.info(msg)
+            cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name')
+            (out, err), proc = self.node.run(cmd, self.ovs_checks,
+                    sudo = True)
+            cmd = "sliver-ovs stop"
+            (out, err), proc = self.node.run(cmd, self.ovs_checks,
+                    sudo = True)
             
             
-        msg = "Deleting the bridge %s" % self.get('bridge_name')
-        self.info(msg)
-        cmd = "sliver-ovs del-bridge %s" % self.get('bridge_name')
-        (out, err), proc = self.node.run(cmd, self.ovs_checks,
-                sudo = True)
-        cmd = "sliver-ovs stop"
-        (out, err), proc = self.node.run(cmd, self.ovs_checks,
-                sudo = True)
-        
-        if proc.poll():
-            self.fail()
-            self.error(msg, out, err)
-            raise RuntimeError, msg
-     
-        self._state = ResourceState.RELEASED
-        
+            if proc.poll():
+                self.fail()
+                self.error(msg, out, err)
+                raise RuntimeError, msg
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
+
+        super(OVSWitch, self).release()
+
index d3e9d79..a7155fb 100644 (file)
@@ -19,7 +19,8 @@
 #            Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 #            Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState       
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, failtrap
 from nepi.resources.planetlab.openvswitch.ovs import OVSWitch        
 from nepi.resources.planetlab.node import PlanetlabNode        
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.planetlab.openvswitch.ovs import OVSWitch        
 from nepi.resources.planetlab.node import PlanetlabNode        
 from nepi.resources.linux.application import LinuxApplication
@@ -178,16 +179,7 @@ class OVSPort(LinuxApplication):
         command = self.replace_paths(command)
         return command
         
         command = self.replace_paths(command)
         return command
         
-    def provision(self):
-        """ Provision the ports.No meaning.
-        """
-        pass
-
-    def discover(self):
-        """ Discover the ports.No meaning
-        """    
-        pass
-
+    @failtrap
     def deploy(self):
         """ Wait until ovswitch is started
         """
     def deploy(self):
         """ Wait until ovswitch is started
         """
@@ -197,50 +189,39 @@ class OVSPort(LinuxApplication):
             self.ec.schedule(reschedule_delay, self.deploy)
             
         else:
             self.ec.schedule(reschedule_delay, self.deploy)
             
         else:
-            try:
-                self.discover()
-                self.provision()
-                self.get_host_ip()
-                self.create_port()
-                self.get_local_end()
-                self.ovswitch.ovs_status()
-                self._state = ResourceState.READY
-            except:
-                self._state = ResourceState.FAILED
-                raise
-
-    def start(self):
-        """ Start the RM. It means nothing special for 
-            ovsport for now.
-        """
-        pass
-       
-    def stop(self):
-        """ Stop the RM. It means nothing special for 
-            ovsport for now.        
-        """
-        pass
-        
+            self.discover()
+            self.provision()
+            self.get_host_ip()
+            self.create_port()
+            self.get_local_end()
+            self.ovswitch.ovs_status()
+            super(OVSPort, self).deploy()
+
     def release(self):
         """ Release the port RM means delete the ports
         """
         # OVS needs to wait until all associated RMs are released
         # to be released
     def release(self):
         """ Release the port RM means delete the ports
         """
         # OVS needs to wait until all associated RMs are released
         # to be released
-        from nepi.resources.planetlab.openvswitch.tunnel import Tunnel
-        rm = self.get_connected(Tunnel.rtype())
-        if rm and rm[0].state < ResourceState.FINISHED:
-            self.ec.schedule(reschedule_delay, self.release)
-            return 
-            
-        msg = "Deleting the port %s" % self.get('port_name')
-        self.info(msg)
-        cmd = "sliver-ovs del_port %s" % self.get('port_name')
-        (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks,
-                sudo = True)
-
-        if proc.poll():
-            self.fail()
-            self.error(msg, out, err)
-            raise RuntimeError, msg
-
-        self._state = ResourceState.RELEASED
+        try:
+            from nepi.resources.planetlab.openvswitch.tunnel import Tunnel
+            rm = self.get_connected(Tunnel.rtype())
+            if rm and rm[0].state < ResourceState.FINISHED:
+                self.ec.schedule(reschedule_delay, self.release)
+                return 
+                
+            msg = "Deleting the port %s" % self.get('port_name')
+            self.info(msg)
+            cmd = "sliver-ovs del_port %s" % self.get('port_name')
+            (out, err), proc = self.node.run(cmd, self.ovswitch.ovs_checks,
+                    sudo = True)
+
+            if proc.poll():
+                self.fail()
+                self.error(msg, out, err)
+                raise RuntimeError, msg
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
+
+        super(OVSPort, self).release()
index 72c6728..c1f81fe 100644 (file)
@@ -19,7 +19,8 @@
 #            Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
 
 from nepi.execution.attribute import Attribute, Flags, Types
 #            Alexandros Kouvakas <alexandros.kouvakas@gmail.com>
 
 from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState 
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.planetlab.node import PlanetlabNode            
 from nepi.resources.planetlab.openvswitch.ovs import OVSWitch   
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.planetlab.node import PlanetlabNode            
 from nepi.resources.planetlab.openvswitch.ovs import OVSWitch   
@@ -29,11 +30,10 @@ import os
 import time
 import socket
 
 import time
 import socket
 
-
 reschedule_delay = "0.5s"
 
 @clsinit_copy                 
 reschedule_delay = "0.5s"
 
 @clsinit_copy                 
-class Tunnel(LinuxApplication):
+class OVSTunnel(LinuxApplication):
     """
     .. class:: Class Args :
       
     """
     .. class:: Class Args :
       
@@ -46,7 +46,7 @@ class Tunnel(LinuxApplication):
 
     """
     
 
     """
     
-    _rtype = "Tunnel"
+    _rtype = "OVSTunnel"
     _authorized_connections = ["OVSPort", "PlanetlabTap"]    
 
     @classmethod
     _authorized_connections = ["OVSPort", "PlanetlabTap"]    
 
     @classmethod
@@ -93,7 +93,7 @@ class Tunnel(LinuxApplication):
         :type guid: int
     
         """
         :type guid: int
     
         """
-        super(Tunnel, self).__init__(ec, guid)
+        super(OVSTunnel, self).__init__(ec, guid)
         self._home = "tunnel-%s" % self.guid
         self.port_info_tunl = []
         self._nodes = []
         self._home = "tunnel-%s" % self.guid
         self.port_info_tunl = []
         self._nodes = []
@@ -246,6 +246,7 @@ class Tunnel(LinuxApplication):
             self.fail()
             self.error(msg, out, err)
             raise RuntimeError, msg
             self.fail()
             self.error(msg, out, err)
             raise RuntimeError, msg
+
         msg = "Connection on host %s configured" \
             % self.node.get("hostname")
         self.info(msg)
         msg = "Connection on host %s configured" \
             % self.node.get("hostname")
         self.info(msg)
@@ -344,6 +345,7 @@ class Tunnel(LinuxApplication):
             self.info(msg)
             return                                                      
 
             self.info(msg)
             return                                                      
 
+    @failtrap
     def provision(self):
         """ Provision the tunnel
         """
     def provision(self):
         """ Provision the tunnel
         """
@@ -363,68 +365,41 @@ class Tunnel(LinuxApplication):
             (self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
             switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2)
 
             (self._pid, self._ppid) = self.udp_connect(self.endpoint2, self.endpoint1)
             switch_connect = self.sw_host_connect(self.endpoint1, self.endpoint2)
 
-        self.debug("------- READY -------")
-        self._provision_time = tnow()
-        self._state = ResourceState.PROVISIONED
-
-    def discover(self):
-        """ Discover the tunnel
-
-        """    
-        pass
+        super(OVSTunnel, self).provision()
 
 
+    @failtrap
     def deploy(self):
         if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
             (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
     def deploy(self):
         if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \
             (not self.endpoint2 or self.endpoint2.state < ResourceState.READY):
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                raise
-            self.debug("----- READY ---- ")
-            self._ready_time = tnow()
-            self._state = ResourceState.READY
-
-    def start(self):
-        """ Start the RM. It means nothing special for 
-            ovsport for now.
-        """
-        pass
-        
-       
-    def stop(self):
-        """ Stop the RM. It means nothing special for 
-            ovsport for now.        
-        """
-        pass
+            self.discover()
+            self.provision()
 
 
+            super(OVSTunnel, self).deploy()
     def release(self):
         """ Release the udp_tunnel on endpoint2.
             On endpoint1 means nothing special.        
         """
     def release(self):
         """ Release the udp_tunnel on endpoint2.
             On endpoint1 means nothing special.        
         """
-        if not self.check_endpoints():
-            # Kill the TAP devices
-            # TODO: Make more generic Release method of PLTAP
-            if self._pid and self._ppid:
-                self._nodes = self.get_node(self.endpoint2) 
-                (out, err), proc = self.node.kill(self._pid,
-                        self._ppid, sudo = True)
-            if err or proc.poll():
-                    # check if execution errors occurred
-                    msg = " Failed to delete TAP device"
-                    self.error(msg, err, err)
-                    self.fail()
-            
-        self._state = ResourceState.RELEASED
-
-
-
-
-
-
+        try:
+            if not self.check_endpoints():
+                # Kill the TAP devices
+                # TODO: Make more generic Release method of PLTAP
+                if self._pid and self._ppid:
+                    self._nodes = self.get_node(self.endpoint2) 
+                    (out, err), proc = self.node.kill(self._pid,
+                            self._ppid, sudo = True)
+                if err or proc.poll():
+                        # check if execution errors occurred
+                        msg = " Failed to delete TAP device"
+                        self.error(msg, err, err)
+                        self.fail()
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
+
+        super(OVSTunnel, self).release()
 
 
 
 
index 991fa99..d9cf17c 100644 (file)
@@ -19,7 +19,7 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import clsinit_copy, ResourceState, \
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import clsinit_copy, ResourceState, \
-        reschedule_delay
+        reschedule_delay, failtrap
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.planetlab.node import PlanetlabNode
 from nepi.util.timefuncs import tnow, tdiffsec
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.planetlab.node import PlanetlabNode
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -147,6 +147,7 @@ class PlanetlabTap(LinuxApplication):
         if_name = self.wait_if_name()
         self.set("deviceName", if_name) 
 
         if_name = self.wait_if_name()
         self.set("deviceName", if_name) 
 
+    @failtrap
     def deploy(self):
         if not self.node or self.node.state < ResourceState.PROVISIONED:
             self.ec.schedule(reschedule_delay, self.deploy)
     def deploy(self):
         if not self.node or self.node.state < ResourceState.PROVISIONED:
             self.ec.schedule(reschedule_delay, self.deploy)
@@ -160,16 +161,13 @@ class PlanetlabTap(LinuxApplication):
             if not self.get("install"):
                 self.set("install", self._install)
 
             if not self.get("install"):
                 self.set("install", self._install)
 
-            try:
-                self.discover()
-                self.provision()
-            except:
-                self.fail()
-                return
+            self.discover()
+            self.provision()
 
             self.debug("----- READY ---- ")
             self.set_ready()
 
 
             self.debug("----- READY ---- ")
             self.set_ready()
 
+    @failtrap
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
     def start(self):
         if self.state == ResourceState.READY:
             command = self.get("command")
@@ -179,8 +177,9 @@ class PlanetlabTap(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self.fail()
+            raise RuntimeError, msg
 
 
+    @failtrap
     def stop(self):
         command = self.get('command') or ''
         
     def stop(self):
         command = self.get('command') or ''
         
@@ -214,12 +213,17 @@ class PlanetlabTap(LinuxApplication):
     def release(self):
         # Node needs to wait until all associated RMs are released
         # to be released
     def release(self):
         # Node needs to wait until all associated RMs are released
         # to be released
-        from nepi.resources.linux.udptunnel import UdpTunnel
-        rms = self.get_connected(UdpTunnel.rtype())
-        for rm in rms:
-            if rm.state < ResourceState.STOPPED:
-                self.ec.schedule(reschedule_delay, self.release)
-                return 
+        try:
+            from nepi.resources.linux.udptunnel import UdpTunnel
+            rms = self.get_connected(UdpTunnel.rtype())
+            for rm in rms:
+                if rm.state < ResourceState.STOPPED:
+                    self.ec.schedule(reschedule_delay, self.release)
+                    return 
+        except:
+            import traceback
+            err = traceback.format_exc()
+            self.error(err)
 
         super(PlanetlabTap, self).release()
 
 
         super(PlanetlabTap, self).release()
 
index f5d39d7..b7caeac 100644 (file)
@@ -30,25 +30,15 @@ N_PROCS = None
 class WorkerThread(threading.Thread):
     class QUIT:
         pass
 class WorkerThread(threading.Thread):
     class QUIT:
         pass
-    class REASSIGNED:
-        pass
-    
+
     def run(self):
         while True:
             task = self.queue.get()
     def run(self):
         while True:
             task = self.queue.get()
-            if task is None:
-                self.done = True
-                self.queue.task_done()
-                continue
-            elif task is self.QUIT:
-                self.done = True
+
+            if task is self.QUIT:
                 self.queue.task_done()
                 break
                 self.queue.task_done()
                 break
-            elif task is self.REASSIGNED:
-                continue
-            else:
-                self.done = False
-            
+
             try:
                 try:
                     callable, args, kwargs = task
             try:
                 try:
                     callable, args, kwargs = task
@@ -61,41 +51,35 @@ class WorkerThread(threading.Thread):
             except:
                 traceback.print_exc(file = sys.stderr)
                 self.delayed_exceptions.append(sys.exc_info())
             except:
                 traceback.print_exc(file = sys.stderr)
                 self.delayed_exceptions.append(sys.exc_info())
-    
-    def waitdone(self):
-        while not self.queue.empty() and not self.done:
-            self.queue.join()
-    
+
     def attach(self, queue, rvqueue, delayed_exceptions):
     def attach(self, queue, rvqueue, delayed_exceptions):
-        if self.isAlive():
-            self.waitdone()
-            oldqueue = self.queue
         self.queue = queue
         self.rvqueue = rvqueue
         self.delayed_exceptions = delayed_exceptions
         self.queue = queue
         self.rvqueue = rvqueue
         self.delayed_exceptions = delayed_exceptions
-        if self.isAlive():
-            oldqueue.put(self.REASSIGNED)
-    
-    def detach(self):
-        if self.isAlive():
-            self.waitdone()
-            self.oldqueue = self.queue
-        self.queue = Queue.Queue()
-        self.rvqueue = None
-        self.delayed_exceptions = []
-    
-    def detach_signal(self):
-        if self.isAlive():
-            self.oldqueue.put(self.REASSIGNED)
-            del self.oldqueue
-        
+   
     def quit(self):
         self.queue.put(self.QUIT)
     def quit(self):
         self.queue.put(self.QUIT)
-        self.join()
 
 
-class ParallelMap(object):
+class ParallelRun(object):
     def __init__(self, maxthreads = None, maxqueue = None, results = True):
     def __init__(self, maxthreads = None, maxqueue = None, results = True):
+        self.maxqueue = maxqueue
+        self.maxthreads = maxthreads
+        
+        self.queue = Queue.Queue(self.maxqueue or 0)
+        
+        self.delayed_exceptions = []
+        
+        if results:
+            self.rvqueue = Queue.Queue()
+        else:
+            self.rvqueue = None
+    
+        self.initialize_workers()
+
+    def initialize_workers(self):
         global N_PROCS
         global N_PROCS
+
+        maxthreads = self.maxthreads
        
         # Compute maximum number of threads allowed by the system
         if maxthreads is None:
        
         # Compute maximum number of threads allowed by the system
         if maxthreads is None:
@@ -112,42 +96,30 @@ class ParallelMap(object):
         
         if maxthreads is None:
             maxthreads = 4
         
         if maxthreads is None:
             maxthreads = 4
-        
-        self.queue = Queue.Queue(maxqueue or 0)
-
-        self.delayed_exceptions = []
-        
-        if results:
-            self.rvqueue = Queue.Queue()
-        else:
-            self.rvqueue = None
-    
         self.workers = []
 
         # initialize workers
         for x in xrange(maxthreads):
         self.workers = []
 
         # initialize workers
         for x in xrange(maxthreads):
-            t = None
-            if t is None:
-                t = WorkerThread()
-                t.setDaemon(True)
-            else:
-                t.waitdone()
-
-            t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
-            self.workers.append(t)
+            worker = WorkerThread()
+            worker.attach(self.queue, self.rvqueue, self.delayed_exceptions)
+            worker.setDaemon(True)
+
+            self.workers.append(worker)
     
     def __del__(self):
         self.destroy()
     
     def __del__(self):
         self.destroy()
-    
+
+    def empty(self):
+        while True:
+            try:
+                self.queue.get(block = False)
+                self.queue.task_done()
+            except Queue.Empty:
+                break
+  
     def destroy(self):
     def destroy(self):
-        for worker in self.workers:
-            worker.waitdone()
-        for worker in self.workers:
-            worker.detach()
-        for worker in self.workers:
-            worker.detach_signal()
-        for worker in self.workers:
-            worker.quit()
+        self.join()
 
         del self.workers[:]
         
 
         del self.workers[:]
         
@@ -158,28 +130,21 @@ class ParallelMap(object):
         self.queue.put_nowait((callable, args, kwargs))
 
     def start(self):
         self.queue.put_nowait((callable, args, kwargs))
 
     def start(self):
-        for thread in self.workers:
-            if not thread.isAlive():
-                thread.start()
+        for worker in self.workers:
+            if not worker.isAlive():
+                worker.start()
     
     def join(self):
     
     def join(self):
-        for thread in self.workers:
-            # That's the sync signal
-            self.queue.put(None)
-            
+        # Wait until all queued tasks have been processed
         self.queue.join()
         self.queue.join()
-        for thread in self.workers:
-            thread.waitdone()
-        
-        if self.delayed_exceptions:
-            typ,val,loc = self.delayed_exceptions[0]
-            del self.delayed_exceptions[:]
-            raise typ,val,loc
-        
-        self.destroy()
+
+        for worker in self.workers:
+            worker.quit()
+
+        for worker in self.workers:
+            worker.join()
     
     def sync(self):
     
     def sync(self):
-        self.queue.join()
         if self.delayed_exceptions:
             typ,val,loc = self.delayed_exceptions[0]
             del self.delayed_exceptions[:]
         if self.delayed_exceptions:
             typ,val,loc = self.delayed_exceptions[0]
             del self.delayed_exceptions[:]
@@ -197,18 +162,3 @@ class ParallelMap(object):
                     except Queue.Empty:
                         raise StopIteration
             
                     except Queue.Empty:
                         raise StopIteration
             
-class ParallelRun(ParallelMap):
-    def __run(self, x):
-        fn, args, kwargs = x
-        return fn(*args, **kwargs)
-    
-    def __init__(self, maxthreads = None, maxqueue = None):
-        super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
-
-    def put(self, what, *args, **kwargs):
-        super(ParallelRun, self).put(self.__run, (what, args, kwargs))
-    
-    def put_nowait(self, what, *args, **kwargs):
-        super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
-
-
index 4602148..0304014 100755 (executable)
@@ -72,17 +72,23 @@ class ExecuteControllersTestCase(unittest.TestCase):
 
     def test_schedule_exception(self):
         def raise_error():
 
     def test_schedule_exception(self):
         def raise_error():
+            # When this task is executed and the error raise,
+            # the FailureManager should set its failure level to 
+            # TASK_FAILURE
             raise RuntimeError, "NOT A REAL ERROR. JUST TESTING!"
 
         ec = ExperimentController()
             raise RuntimeError, "NOT A REAL ERROR. JUST TESTING!"
 
         ec = ExperimentController()
-        ec.schedule("2s", raise_error)
 
 
-        while ec.ecstate not in [ECState.FAILED, ECState.TERMINATED]:
-           time.sleep(1)
+        tid = ec.schedule("2s", raise_error, track = True)
         
         
-        self.assertEquals(ec.ecstate, ECState.FAILED)
-        ec.shutdown()
+        while True:
+            task = ec.get_task(tid)
+            if task.status != TaskStatus.NEW:
+                break
+
+            time.sleep(1)
 
 
+        self.assertEquals(task.status, TaskStatus.ERROR)
 
 if __name__ == '__main__':
     unittest.main()
 
 if __name__ == '__main__':
     unittest.main()
index 091c43e..e594f26 100755 (executable)
 
 
 from nepi.execution.attribute import Attribute
 
 
 from nepi.execution.attribute import Attribute
-from nepi.execution.ec import ExperimentController 
-from nepi.execution.resource import ResourceManager, ResourceState, clsinit, \
-        ResourceAction
+from nepi.execution.ec import ExperimentController, FailureLevel 
+from nepi.execution.resource import ResourceManager, ResourceState, \
+        clsinit_copy, ResourceAction, failtrap
 
 import random
 import time
 import unittest
 
 
 import random
 import time
 import unittest
 
-@clsinit
+@clsinit_copy
 class MyResource(ResourceManager):
     _rtype = "MyResource"
 
 class MyResource(ResourceManager):
     _rtype = "MyResource"
 
@@ -40,14 +40,13 @@ class MyResource(ResourceManager):
     def __init__(self, ec, guid):
         super(MyResource, self).__init__(ec, guid)
 
     def __init__(self, ec, guid):
         super(MyResource, self).__init__(ec, guid)
 
-@clsinit
+@clsinit_copy
 class AnotherResource(ResourceManager):
     _rtype = "AnotherResource"
 
     def __init__(self, ec, guid):
         super(AnotherResource, self).__init__(ec, guid)
 
 class AnotherResource(ResourceManager):
     _rtype = "AnotherResource"
 
     def __init__(self, ec, guid):
         super(AnotherResource, self).__init__(ec, guid)
 
-
 class Channel(ResourceManager):
     _rtype = "Channel"
 
 class Channel(ResourceManager):
     _rtype = "Channel"
 
@@ -89,7 +88,7 @@ class Node(ResourceManager):
             self.discover()
             self.provision()
             self.logger.debug(" -------- PROVISIONED ------- ")
             self.discover()
             self.provision()
             self.logger.debug(" -------- PROVISIONED ------- ")
-            self.ec.schedule("3s", self.deploy)
+            self.ec.schedule("1s", self.deploy)
         elif self.state == ResourceState.PROVISIONED:
             ifaces = self.get_connected(Interface.rtype())
             for rm in ifaces:
         elif self.state == ResourceState.PROVISIONED:
             ifaces = self.get_connected(Interface.rtype())
             for rm in ifaces:
@@ -111,15 +110,29 @@ class Application(ResourceManager):
         if node.state < ResourceState.READY:
             self.ec.schedule("0.5s", self.deploy)
         else:
         if node.state < ResourceState.READY:
             self.ec.schedule("0.5s", self.deploy)
         else:
-            time.sleep(random.random() * 5)
+            time.sleep(random.random() * 2)
             super(Application, self).deploy()
             self.logger.debug(" -------- DEPLOYED ------- ")
 
     def start(self):
         super(Application, self).start()
             super(Application, self).deploy()
             self.logger.debug(" -------- DEPLOYED ------- ")
 
     def start(self):
         super(Application, self).start()
-        time.sleep(random.random() * 5)
+        time.sleep(random.random() * 3)
         self._state = ResourceState.FINISHED
         self._state = ResourceState.FINISHED
-   
+
+class ErrorApplication(ResourceManager):
+    _rtype = "ErrorApplication"
+
+    def __init__(self, ec, guid):
+        super(ErrorApplication, self).__init__(ec, guid)
+
+    @failtrap
+    def deploy(self):
+        node = self.get_connected(Node.rtype())[0]
+        if node.state < ResourceState.READY:
+            self.ec.schedule("0.5s", self.deploy)
+        else:
+            time.sleep(random.random() * 2)
+            raise RuntimeError, "NOT A REAL ERROR. JUST TESTING"
 
 class ResourceFactoryTestCase(unittest.TestCase):
     def test_add_resource_factory(self):
 
 class ResourceFactoryTestCase(unittest.TestCase):
     def test_add_resource_factory(self):
@@ -130,13 +143,13 @@ class ResourceFactoryTestCase(unittest.TestCase):
         ResourceFactory.register_type(AnotherResource)
 
         self.assertEquals(MyResource.rtype(), "MyResource")
         ResourceFactory.register_type(AnotherResource)
 
         self.assertEquals(MyResource.rtype(), "MyResource")
-        self.assertEquals(len(MyResource._attributes), 1)
+        self.assertEquals(len(MyResource._attributes), 2)
 
         self.assertEquals(ResourceManager.rtype(), "Resource")
 
         self.assertEquals(ResourceManager.rtype(), "Resource")
-        self.assertEquals(len(ResourceManager._attributes), 0)
+        self.assertEquals(len(ResourceManager._attributes), 1)
 
         self.assertEquals(AnotherResource.rtype(), "AnotherResource")
 
         self.assertEquals(AnotherResource.rtype(), "AnotherResource")
-        self.assertEquals(len(AnotherResource._attributes), 0)
+        self.assertEquals(len(AnotherResource._attributes), 1)
 
         self.assertEquals(len(ResourceFactory.resource_types()), 2)
         
 
         self.assertEquals(len(ResourceFactory.resource_types()), 2)
         
@@ -274,15 +287,43 @@ class ResourceManagerTestCase(unittest.TestCase):
 
         ec.shutdown()
 
 
         ec.shutdown()
 
-    def test_start_with_condition(self):
+    def test_exception(self):
+        from nepi.execution.resource import ResourceFactory
+        
+        ResourceFactory.register_type(ErrorApplication)
+        ResourceFactory.register_type(Node)
+        ResourceFactory.register_type(Interface)
+        ResourceFactory.register_type(Channel)
+
+        ec = ExperimentController()
+
+        node = ec.register_resource("Node")
+
+        apps = list()
+        for i in xrange(10):
+            app = ec.register_resource("ErrorApplication")
+            ec.register_connection(app, node)
+            apps.append(app)
+
+
+        ec.deploy()
+
+        ec.wait_finished(apps)
+
+        ec.shutdown()
+
+        self.assertTrue(ec._fm._failure_level == FailureLevel.RM_FAILURE)
+
+
+    def ztest_start_with_condition(self):
         # TODO!!!
         pass
     
         # TODO!!!
         pass
     
-    def test_stop_with_condition(self):
+    def ztest_stop_with_condition(self):
         # TODO!!!
         pass
 
         # TODO!!!
         pass
 
-    def test_set_with_condition(self):
+    def ztest_set_with_condition(self):
         # TODO!!!
         pass
 
         # TODO!!!
         pass
 
index 0cb6c20..d072074 100644 (file)
@@ -112,15 +112,15 @@ class OvsTestCase(unittest.TestCase):
         ec.set(tap2, "prefix4", 24)
         ec.register_connection(tap2, node4)
 
         ec.set(tap2, "prefix4", 24)
         ec.register_connection(tap2, node4)
 
-        ovstun1 = ec.register_resource("Tunnel")
+        ovstun1 = ec.register_resource("OVSTunnel")
         ec.register_connection(port1, ovstun1)
         ec.register_connection(tap1, ovstun1)
 
         ec.register_connection(port1, ovstun1)
         ec.register_connection(tap1, ovstun1)
 
-        ovstun2 = ec.register_resource("Tunnel")
+        ovstun2 = ec.register_resource("OVSTunnel")
         ec.register_connection(port3, ovstun2)
         ec.register_connection(tap2, ovstun2)
 
         ec.register_connection(port3, ovstun2)
         ec.register_connection(tap2, ovstun2)
 
-        ovstun3 = ec.register_resource("Tunnel")
+        ovstun3 = ec.register_resource("OVSTunnel")
         ec.register_connection(port2, ovstun3)
         ec.register_connection(port4, ovstun3)
 
         ec.register_connection(port2, ovstun3)
         ec.register_connection(port4, ovstun3)
 
diff --git a/test/util/parallel.py b/test/util/parallel.py
new file mode 100644 (file)
index 0000000..7f8f5a0
--- /dev/null
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+
+from nepi.util.parallel import ParallelRun
+
+import datetime
+import unittest
+
+class ParallelRunTestCase(unittest.TestCase):
+    def test_run_simple(self):
+        runner = ParallelRun(maxthreads = 4)
+        runner.start()
+
+        count = [0]
+
+        def inc(count):
+            count[0] += 1
+        
+        for x in xrange(10):
+            runner.put(inc, count)
+
+        runner.destroy()
+
+        self.assertEquals(count[0], 10)
+
+    def test_run_interrupt(self):
+
+        def sleep():
+            import time
+            time.sleep(5)
+
+        startt = datetime.datetime.now()
+
+        runner = ParallelRun(maxthreads = 4)
+        runner.start()
+       
+        for x in xrange(100):
+            runner.put(sleep)
+
+        runner.empty()
+        runner.destroy()
+
+        endt = datetime.datetime.now()
+        time_elapsed = (endt - startt).seconds
+        self.assertTrue( time_elapsed < 500)
+
+    def test_run_error(self):
+        count = [0]
+
+        def inc(count):
+            count[0] += 1
+        def error():
+            raise RuntimeError()
+
+        runner = ParallelRun(maxthreads = 4)
+        runner.start()
+       
+        for x in xrange(4):
+            runner.put(inc, count)
+
+        runner.put(error)
+       
+        runner.destroy()
+      
+        self.assertEquals(count[0], 4)
+        
+        self.assertRaises(RuntimeError, runner.sync)
+
+if __name__ == '__main__':
+    unittest.main()
+