ec_shutdown
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 7 Oct 2013 06:59:47 +0000 (08:59 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 7 Oct 2013 06:59:47 +0000 (08:59 +0200)
19 files changed:
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/ccnapplication.py
src/nepi/resources/linux/ccn/ccncontent.py
src/nepi/resources/linux/ccn/ccnd.py
src/nepi/resources/linux/ccn/ccnr.py
src/nepi/resources/linux/ccn/fibentry.py
src/nepi/resources/linux/interface.py
src/nepi/resources/linux/node.py
src/nepi/resources/linux/udptest.py
src/nepi/resources/linux/udptunnel.py
src/nepi/resources/omf/application.py
src/nepi/resources/omf/channel.py
src/nepi/resources/omf/interface.py
src/nepi/resources/omf/node.py
src/nepi/resources/planetlab/node.py
src/nepi/resources/planetlab/tap.py
src/nepi/util/parallel.py

index e59e626..30dd7a4 100644 (file)
@@ -36,6 +36,51 @@ import sys
 import time
 import threading
 
+class FailurePolicy(object):
+    """ Defines how to respond to experiment failures  
+    """
+    IGNORE_RM_FAILURE = 1
+    ABORT_ON_RM_FAILURE = 2
+
+class FailureLevel(object):
+    """ Describe the system failure state
+    """
+    OK = 1
+    RM_FAILURE = 2
+    TASK_FAILURE = 3
+    EC_FAILURE = 4
+
+class FailureManager(object):
+    """ The FailureManager is responsible for handling errors,
+    and deciding whether an experiment should be aborted
+    """
+
+    def __init__(self, failure_policy = None):
+        self._failure_level = FailureLevel.OK
+        self._failure_policy = failure_policy or \
+                FailurePolicy.ABORT_ON_RM_FAILURE
+
+    @property
+    def abort(self):
+        if self._failure_level == FailureLevel.EC_FAILURE:
+            return True
+
+        if self._failure_level in [FailureLevel.TASK_FAILURE, 
+                FailureLevel.RM_FAILURE] and \
+                        self._failure_policy == FailurePolicy.ABORT_ON_RM_FAILURE:
+            return True
+
+        return False
+
+    def set_rm_failure(self):
+        self._failure_level = FailureLevel.RM_FAILURE
+
+    def set_task_failure(self):
+        self._failure_level = FailureLevel.TASK_FAILURE
+
+    def set_ec_failure(self):
+        self._failure_level = FailureLevel.EC_FAILURE
+
 class ECState(object):
     """ State of the Experiment Controller
    
@@ -49,19 +94,23 @@ class ExperimentController(object):
     .. class:: Class Args :
       
         :param exp_id: Human readable identifier for the experiment scenario. 
-                       It will be used in the name of the directory 
-                        where experiment related information is stored
         :type exp_id: str
 
     .. note::
 
-        An experiment, or scenario, is defined by a concrete use, behavior,
-        configuration and interconnection of resources that describe a single
-        experiment case (We call this the experiment description). 
-        A same experiment (scenario) can be run many times.
+        An experiment, or scenario, is defined by a concrete set of resources,
+        behavior, configuration and interconnection of those resources. 
+        The Experiment Description (ED) is a detailed representation of a
+        single experiment. It contains all the necessary information to 
+        allow repeating the experiment. NEPI allows to describe
+        experiments by registering components (resources), configuring them
+        and interconnecting them.
+        
+        A same experiment (scenario) can be executed many times, generating 
+        different results. We call an experiment execution (instance) a 'run'.
 
-        The ExperimentController (EC), is the entity responsible for 
-        managing an experiment instance (run). The same scenario can be 
+        The ExperimentController (EC), is the entity responsible of
+        managing an experiment run. The same scenario can be 
         recreated (and re-run) by instantiating an EC and recreating 
         the same experiment description. 
 
@@ -75,15 +124,15 @@ class ExperimentController(object):
         single resource. ResourceManagers are specific to a resource
         type (i.e. An RM to control a Linux application will not be
         the same as the RM used to control a ns-3 simulation).
-        In order for a new type of resource to be supported in NEPI
-        a new RM must be implemented. NEPI already provides different
+        To support a new type of resource in NEPI, a new RM must be 
+        implemented. NEPI already provides a variety of
         RMs to control basic resources, and new can be extended from
         the existing ones.
 
         Through the EC interface the user can create ResourceManagers (RMs),
-        configure them and interconnect them, in order to describe an experiment.
+        configure them and interconnect them, to describe an experiment.
         Describing an experiment through the EC does not run the experiment.
-        Only when the 'deploy()' method is invoked on the EC, will the EC take 
+        Only when the 'deploy()' method is invoked on the EC, the EC will take 
         actions to transform the 'described' experiment into a 'running' experiment.
 
         While the experiment is running, it is possible to continue to
@@ -97,8 +146,8 @@ class ExperimentController(object):
         However, since a same 'experiment' can be run many times, the experiment
         id is not enough to identify an experiment instance (run).
         For this reason, the ExperimentController has two identifier, the 
-        exp_id, which can be re-used by different ExperimentController instances,
-        and the run_id, which unique to a ExperimentController instance, and
+        exp_id, which can be re-used in different ExperimentController,
+        and the run_id, which is unique to one ExperimentController instance, and
         is automatically generated by NEPI.
         
     """
@@ -108,12 +157,16 @@ class ExperimentController(object):
         # Logging
         self._logger = logging.getLogger("ExperimentController")
 
-        # Run identifier. It identifies a concrete instance (run) of an experiment.
-        # Since a same experiment (same configuration) can be run many times,
-        # this id permits to identify concrete exoeriment run
+        # Run identifier. It identifies a concrete execution instance (run) 
+        # of an experiment.
+        # Since a same experiment (same configuration) can be executed many 
+        # times, this run_id permits to separate result files generated on 
+        # different experiment executions
         self._run_id = tsformat()
 
         # Experiment identifier. Usually assigned by the user
+        # Identifies the experiment scenario (i.e. configuration, 
+        # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
         # generator of globally unique ids
@@ -128,7 +181,7 @@ class ExperimentController(object):
         # Tasks
         self._tasks = dict()
 
-        # RM groups 
+        # RM groups (for deployment) 
         self._groups = dict()
 
         # generator of globally unique id for groups
@@ -140,6 +193,12 @@ class ExperimentController(object):
         self._thread.setDaemon(True)
         self._thread.start()
 
+        # Flag to stop processing thread
+        self._stop = False
+    
+        # Entity in charge of managing system failures
+        self._fm = FailureManager()
+
         # EC state
         self._state = ECState.RUNNING
 
@@ -172,69 +231,86 @@ class ExperimentController(object):
         return self._run_id
 
     @property
-    def finished(self):
-        """ Put the state of the Experiment Controller into a final state :
-            Either TERMINATED or FAILED
+    def abort(self):
+        return self._fm.abort
 
-        """
-        return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+    def set_rm_failure(self):
+        self._fm.set_rm_failure()
 
     def wait_finished(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state FINISHED ( or STOPPED, FAILED or RELEASED )
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or 
+            RELEASED ) or until a System Failure occurs (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
+
         """
-        return self.wait(guids)
+
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.STOPPED, 
+                quit = quit)
 
     def wait_started(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED)
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= STARTED or until a System Failure occurs 
+            (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, state = ResourceState.STARTED)
+
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.STARTED, 
+                quit = quit)
 
     def wait_released(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state RELEASED (or FAILED)
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state = RELEASED or until the EC fails
 
         :param guids: List of guids
         :type guids: list
         """
-        # TODO: solve state concurrency BUG and !!!!
-        # correct waited release state to state = ResourceState.FAILED)
-        return self.wait(guids, state = ResourceState.FINISHED)
+
+        def quit():
+            return self._state == ECState.FAILED
+
+        return self.wait(guids, state = ResourceState.RELEASED, 
+                quit = quit)
 
     def wait_deployed(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state READY (or any higher state)
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= READY or until a System Failure occurs 
+            (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, state = ResourceState.READY)
 
-    def wait(self, guids, state = ResourceState.STOPPED):
-        """ Blocking method that waits until all the RM from the 'guid' list 
-            reached state 'state' or until a failure occurs
-            
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.READY, 
+                quit = quit)
+
+    def wait(self, guids, state, quit):
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= 'state' or until quit yileds True
+           
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
-        # we randomly alter the order of the guids to avoid ordering
-        # dependencies (e.g. LinuxApplication RMs runing on the same
-        # linux host will be synchronized by the LinuxNode SSH lock)
-        random.shuffle(guids)
-
         while True:
-            # If no more guids to wait for or an error occured, then exit
-            if len(guids) == 0 or self.finished:
+            # If there are no more guids to wait for
+            # or the quit function returns True, exit the loop
+            if len(guids) == 0 or quit():
                 break
 
             # If a guid reached one of the target states, remove it from list
@@ -245,26 +321,11 @@ class ExperimentController(object):
                 guids.remove(guid)
             else:
                 # Debug...
-                self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid,
-                    self.state(guid, hr = True), state))
-
-                # Take the opportunity to 'refresh' the states of the RMs.
-                # Query only the first up to N guids (not to overwhelm 
-                # the local machine)
-                n = 100
-                lim = n if len(guids) > n else ( len(guids) -1 )
-                nguids = guids[0: lim]
-
-                # schedule state request for all guids (take advantage of
-                # scheduler multi threading).
-                for guid in nguids:
-                    callback = functools.partial(self.state, guid)
-                    self.schedule("0s", callback)
-
-                # If the guid is not in one of the target states, wait and
-                # continue quering. We keep the sleep big to decrease the
-                # number of RM state queries
-                time.sleep(4)
+                hrstate = ResourceState2str.get(rstate)
+                self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
+                    guid, rstate, state))
+
+            time.sleep(0.5)
   
     def get_task(self, tid):
         """ Get a specific task
@@ -574,7 +635,7 @@ class ExperimentController(object):
         self.logger.debug(" ------- DEPLOY START ------ ")
 
         if not guids:
-            # If no guids list was indicated, all 'NEW' RMs will be deployed
+            # If no guids list was passed, all 'NEW' RMs will be deployed
             guids = []
             for guid in self.resources:
                 if self.state(guid) == ResourceState.NEW:
@@ -584,6 +645,7 @@ class ExperimentController(object):
             guids = [guids]
 
         # Create deployment group
+        # New guids can be added to a same deployment group later on
         new_group = False
         if not group:
             new_group = True
@@ -594,20 +656,9 @@ class ExperimentController(object):
 
         self._groups[group].extend(guids)
 
-        # Before starting deployment we disorder the guids list with the
-        # purpose of speeding up the whole deployment process.
-        # It is likely that the user inserted in the 'guids' list closely
-        # resources one after another (e.g. all applications
-        # connected to the same node can likely appear one after another).
-        # This can originate a slow down in the deployment since the N 
-        # threads the parallel runner uses to processes tasks may all
-        # be taken up by the same family of resources waiting for the 
-        # same conditions (e.g. LinuxApplications running on a same 
-        # node share a single lock, so they will tend to be serialized).
-        # If we disorder the guids list, this problem can be mitigated.
-        random.shuffle(guids)
-
         def wait_all_and_start(group):
+            # Function that checks if all resources are READY
+            # before scheduling a start_with_conditions for each RM
             reschedule = False
             
             # Get all guids in group
@@ -630,9 +681,9 @@ class ExperimentController(object):
         if wait_all_ready and new_group:
             # Schedule a function to check that all resources are
             # READY, and only then schedule the start.
-            # This aimes at reducing the number of tasks looping in the 
+            # This aims at reducing the number of tasks looping in the 
             # scheduler. 
-            # Intead of having N start tasks, we will have only one for 
+            # Instead of having many start tasks, we will have only one for 
             # the whole group.
             callback = functools.partial(wait_all_and_start, group)
             self.schedule("1s", callback)
@@ -672,11 +723,17 @@ class ExperimentController(object):
         Releases all the resources and stops task processing thread
 
         """
+        # TODO: Clean the parallel runner!! STOP all ongoing tasks
+        ####
+
         self.release()
 
         # Mark the EC state as TERMINATED
         self._state = ECState.TERMINATED
 
+        # Stop processing thread
+        self._stop = True
+
         # Notify condition to wake up the processing thread
         self._notify()
         
@@ -754,8 +811,8 @@ class ExperimentController(object):
         runner = ParallelRun(maxthreads = nthreads)
         runner.start()
 
-        try:
-            while not self.finished:
+        while not self._stop:
+            try:
                 self._cond.acquire()
 
                 task = self._scheduler.next()
@@ -784,16 +841,20 @@ class ExperimentController(object):
                 if task:
                     # Process tasks in parallel
                     runner.put(self._execute, task)
-        except: 
-            import traceback
-            err = traceback.format_exc()
-            self.logger.error("Error while processing tasks in the EC: %s" % err)
+            except: 
+                import traceback
+                err = traceback.format_exc()
+                self.logger.error("Error while processing tasks in the EC: %s" % err)
 
-            self._state = ECState.FAILED
-        finally:   
-            self.logger.debug("Exiting the task processing loop ... ")
-            runner.sync()
-            runner.destroy()
+                # Set the EC to FAILED state 
+                self._state = ECState.FAILED
+
+                # Set the FailureManager failure level
+                self._fm.set_ec_failure()
+
+        self.logger.debug("Exiting the task processing loop ... ")
+        runner.sync()
+        runner.destroy()
 
     def _execute(self, task):
         """ Executes a single task. 
@@ -821,15 +882,8 @@ class ExperimentController(object):
             
             self.logger.error("Error occurred while executing task: %s" % err)
 
-            # Set the EC to FAILED state (this will force to exit the task
-            # processing thread)
-            self._state = ECState.FAILED
-
-            # Notify condition to wake up the processing thread
-            self._notify()
-
-            # Propage error to the ParallelRunner
-            raise
+            # Set the FailureManager failure level
+            self._fm.set_task_failure()
 
     def _notify(self):
         """ Awakes the processing thread in case it is blocked waiting
index a9087ae..18edeb0 100644 (file)
@@ -403,6 +403,7 @@ class ResourceManager(Logger):
  
     def fail(self):
         self.set_failed()
+        self.ec.set_rm_failure()
 
     def set(self, name, value):
         """ Set the value of the attribute
@@ -741,7 +742,6 @@ class ResourceManager(Logger):
             self.debug("----- STARTING ---- ")
             self.deploy()
 
-
     def connect(self, guid):
         """ Performs actions that need to be taken upon associating RMs.
         This method should be redefined when necessary in child classes.
index 4f4b64e..763106c 100644 (file)
@@ -301,7 +301,7 @@ class LinuxApplication(ResourceManager):
         # Since provisioning takes a long time, before
         # each step we check that the EC is still 
         for step in steps:
-            if self.ec.finished:
+            if self.ec.abort:
                 raise RuntimeError, "EC finished"
             
             ret = step()
@@ -484,7 +484,7 @@ class LinuxApplication(ResourceManager):
                 self.provision()
             except:
                 self.fail()
-                raise
+                return
 
             super(LinuxApplication, self).deploy()
     
@@ -499,10 +499,14 @@ class LinuxApplication(ResourceManager):
             self.set_finished()
         else:
 
-            if self.in_foreground:
-                self._run_in_foreground()
-            else:
-                self._run_in_background()
+            try:
+                if self.in_foreground:
+                    self._run_in_foreground()
+                else:
+                    self._run_in_background()
+            except:
+                self.fail()
+                return
 
             super(LinuxApplication, self).start()
 
@@ -530,7 +534,6 @@ class LinuxApplication(ResourceManager):
                 blocking = False)
 
         if self._proc.poll():
-            self.fail()
             self.error(msg, out, err)
             raise RuntimeError, msg
 
@@ -560,7 +563,6 @@ class LinuxApplication(ResourceManager):
         msg = " Failed to start command '%s' " % command
         
         if proc.poll():
-            self.fail()
             self.error(msg, out, err)
             raise RuntimeError, msg
     
@@ -577,7 +579,6 @@ class LinuxApplication(ResourceManager):
 
             # Out is what was written in the stderr file
             if err:
-                self.fail()
                 msg = " Failed to start command '%s' " % command
                 self.error(msg, out, err)
                 raise RuntimeError, msg
@@ -609,8 +610,8 @@ class LinuxApplication(ResourceManager):
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
                         self.fail()
+                        return
         
-        if self.state == ResourceState.STARTED:
             super(LinuxApplication, self).stop()
 
     def release(self):
@@ -622,10 +623,7 @@ class LinuxApplication(ResourceManager):
 
         self.stop()
 
-        if self.state != ResourceState.FAILED:
-            self.info("Resource released")
-
-            super(LinuxApplication, self).release()
+        super(LinuxApplication, self).release()
    
     @property
     def state(self):
index 3d8943a..46a3cc4 100644 (file)
@@ -61,7 +61,7 @@ class LinuxCCNApplication(LinuxApplication):
                 self.provision()
             except:
                 self.fail()
-                raise
+                return
  
             self.debug("----- READY ---- ")
             self.set_ready()
index cae55b5..1fa93cc 100644 (file)
@@ -98,8 +98,8 @@ class LinuxCCNContent(LinuxApplication):
                 self.provision()
             except:
                 self.fail()
-                raise
+                return 
+
             self.debug("----- READY ---- ")
             self.set_ready()
 
@@ -121,7 +121,6 @@ class LinuxCCNContent(LinuxApplication):
                 env, blocking = True)
 
         if proc.poll():
-            self.fail()
             msg = "Failed to execute command"
             self.error(msg, out, err)
             raise RuntimeError, msg
@@ -136,7 +135,6 @@ class LinuxCCNContent(LinuxApplication):
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             sef.fail()
-            raise RuntimeError, msg
 
     @property
     def _start_command(self):
index 4fdb0ce..71eb12a 100644 (file)
@@ -178,8 +178,8 @@ class LinuxCCND(LinuxApplication):
                 self.provision()
             except:
                 self.fail()
-                raise
+                return
+
             self.debug("----- READY ---- ")
             self.set_ready()
 
@@ -211,8 +211,7 @@ class LinuxCCND(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self.set_failed()
-            raise RuntimeError, msg
+            self.fail()
 
     def stop(self):
         command = self.get('command') or ''
index 378f93c..46c3f3b 100644 (file)
@@ -222,8 +222,8 @@ class LinuxCCNR(LinuxApplication):
                 self.provision()
             except:
                 self.fail()
-                raise
+                return 
+
             self.debug("----- READY ---- ")
             self.set_ready()
 
@@ -265,7 +265,6 @@ class LinuxCCNR(LinuxApplication):
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             self.fail()
-            raise RuntimeError, msg
 
     @property
     def _start_command(self):
index aad03ca..48c2d6d 100644 (file)
@@ -136,8 +136,8 @@ class LinuxFIBEntry(LinuxApplication):
                 self.configure()
             except:
                 self.fail()
-                raise
+                return
+
             self.debug("----- READY ---- ")
             self.set_ready()
 
@@ -161,7 +161,6 @@ class LinuxFIBEntry(LinuxApplication):
         if proc.poll():
             msg = "Failed to execute command"
             self.error(msg, out, err)
-            self.fail()
             raise RuntimeError, msg
         
     def configure(self):
@@ -205,7 +204,6 @@ class LinuxFIBEntry(LinuxApplication):
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             self.fail()
-            raise RuntimeError, msg
 
     def stop(self):
         command = self.get('command')
@@ -224,7 +222,6 @@ class LinuxFIBEntry(LinuxApplication):
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
                 self.fail()
-                raise RuntimeError, msg
 
     @property
     def _start_command(self):
index 26a1549..a170f41 100644 (file)
@@ -243,7 +243,7 @@ class LinuxInterface(ResourceManager):
                 self.provision()
             except:
                 self.fail()
-                raise
+                return 
 
             super(LinuxInterface, self).deploy()
 
index b50d7fa..9cab4ca 100644 (file)
@@ -326,7 +326,6 @@ class LinuxNode(ResourceManager):
     def provision(self):
         # check if host is alive
         if not self.is_alive():
-            self.fail()
             
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
             self.error(msg)
@@ -361,7 +360,7 @@ class LinuxNode(ResourceManager):
                 self.provision()
             except:
                 self.fail()
-                raise
+                return
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
index 779b8c1..1b7ac9c 100644 (file)
@@ -259,7 +259,6 @@ class LinuxUdpTest(LinuxApplication):
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
                 self.fail()
-                raise RuntimeError, msg
         else:
             super(LinuxUdpTest, self).start()
  
index 82f0139..4dae96c 100644 (file)
@@ -141,7 +141,6 @@ class UdpTunnel(LinuxApplication):
         msg = " Failed to connect endpoints "
         
         if proc.poll():
-            self.fail()
             self.error(msg, out, err)
             raise RuntimeError, msg
     
@@ -154,7 +153,6 @@ class UdpTunnel(LinuxApplication):
             (out, err), proc = endpoint.node.check_errors(self.run_home(endpoint))
             # Out is what was written in the stderr file
             if err:
-                self.fail()
                 msg = " Failed to start command '%s' " % command
                 self.error(msg, out, err)
                 raise RuntimeError, msg
@@ -202,7 +200,7 @@ class UdpTunnel(LinuxApplication):
                 self.provision()
             except:
                 self.fail()
-                raise
+                return
  
             self.debug("----- READY ---- ")
             self.set_ready()
@@ -217,7 +215,6 @@ class UdpTunnel(LinuxApplication):
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             self.fail()
-            raise RuntimeError, msg
 
     def stop(self):
         """ Stops application execution
@@ -238,8 +235,8 @@ class UdpTunnel(LinuxApplication):
                     msg = " Failed to STOP tunnel"
                     self.error(msg, err1, err2)
                     self.fail()
+                    return
 
-        if self.state == ResourceState.STARTED:
             self.set_stopped()
 
     @property
@@ -311,7 +308,6 @@ class UdpTunnel(LinuxApplication):
         else:
             msg = "Couldn't retrieve %s" % filename
             self.error(msg, out, err)
-            self.fail()
             raise RuntimeError, msg
 
         return result
index 673f810..0bc0c62 100644 (file)
@@ -195,7 +195,7 @@ class OMFApplication(ResourceManager):
             msg = "Credentials were not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
-            #raise
+            return
 
         super(OMFApplication, self).stop()
 
index b51cd89..ccd67be 100644 (file)
@@ -163,7 +163,7 @@ class OMFChannel(ResourceManager):
             msg = "Channel's value is not initialized"
             self.error(msg)
             self.fail()
-            raise
+            return
 
         self._nodes_guid = self._get_target(self._connections) 
         if self._nodes_guid == "reschedule" :
@@ -180,7 +180,7 @@ class OMFChannel(ResourceManager):
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
-            raise
+            return
 
         super(OMFChannel, self).deploy()
 
index a3b9c3a..1fb1762 100644 (file)
@@ -102,7 +102,6 @@ class OMFWifiInterface(ResourceManager):
             msg = "Connection between %s %s and %s %s accepted" % \
                 (self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
-
             return True
 
         msg = "Connection between %s %s and %s %s refused" % \
@@ -123,7 +122,6 @@ class OMFWifiInterface(ResourceManager):
         if rm_list: return rm_list[0]
         return None
 
-
     def configure_iface(self):
         """ Configure the interface without the ip
 
@@ -139,13 +137,11 @@ class OMFWifiInterface(ResourceManager):
                 self._omf_api.configure(self.node.get('hostname'), attrname, 
                         attrval)
         except AttributeError:
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
-            self.debug(msg)
-            #raise
+            self.error(msg)
+            raise
         
         super(OMFWifiInterface, self).provision()
-        return True
 
     def configure_ip(self):
         """ Configure the ip of the interface
@@ -162,23 +158,21 @@ class OMFWifiInterface(ResourceManager):
                     attrval)
         except AttributeError:
             msg = "Credentials are not initialzed. XMPP Connections impossible"
-            self.debug(msg)
-            self.fail()
-            #raise
+            self.error(msg)
+            raise
 
-        return True
 
     def deploy(self):
         """ Deploy the RM. It means : Get the xmpp client and send messages 
         using OMF 5.4 protocol to configure the interface.
         It becomes DEPLOYED after sending messages to configure the interface
         """
-        if not self._omf_api :
+        if not self._omf_api:
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
                 self.get('xmppHost'), self.get('xmppPort'), 
                 self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
-        if not self._omf_api :
+        if not self._omf_api:
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
@@ -189,13 +183,13 @@ class OMFWifiInterface(ResourceManager):
             msg = "Interface's variable are not initialized"
             self.error(msg)
             self.fail()
-            return False
+            return
 
         if not self.node.get('hostname') :
             msg = "The channel is connected with an undefined node"
             self.error(msg)
             self.fail()
-            return False
+            return
 
         # Just for information
         self.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + \
@@ -203,23 +197,21 @@ class OMFWifiInterface(ResourceManager):
             self.get('essid') + " : " + self.get('ip'))
     
         # Check if the node is already deployed
-        chk1 = True
-        if self.state < ResourceState.PROVISIONED:
-            chk1 = self.configure_iface()
-        if chk1:
-            chk2 = self.configure_ip()
+        try:
+            if self.state < ResourceState.PROVISIONED:
+                if self.configure_iface():
+                    self.configure_ip()
+        except:
+            self.fail()
+            return
 
-        if not (chk1 and chk2) :
-            return False
-            
         super(OMFWifiInterface, self).deploy()
-        return True
 
     def release(self):
         """ Clean the RM at the end of the experiment and release the API
 
         """
-        if self._omf_api :
+        if self._omf_api:
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
                 self.get('xmppHost'), self.get('xmppPort'), 
                 self.get('xmppPassword'), exp_id = self.ec.exp_id)
index 1421078..570afd0 100644 (file)
@@ -130,22 +130,22 @@ class OMFNode(ResourceManager):
             It becomes DEPLOYED after sending messages to enroll the node
 
         """ 
-        if not self._omf_api :
+        if not self._omf_api:
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
                 self.get('xmppHost'), self.get('xmppPort'), 
                 self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
-        if not self._omf_api :
+        if not self._omf_api:
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
             return
 
-        if not self.get('hostname') :
+        if not self.get('hostname'):
             msg = "Hostname's value is not initialized"
             self.error(msg)
             self.fail()
-            return False
+            return
 
         try:
             self._omf_api.enroll_host(self.get('hostname'))
@@ -153,7 +153,7 @@ class OMFNode(ResourceManager):
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
-            #raise AttributeError, msg
+            return
 
         super(OMFNode, self).deploy()
 
@@ -174,7 +174,6 @@ class OMFNode(ResourceManager):
            It becomes STARTED as soon as this method starts.
 
         """
-
         super(OMFNode, self).start()
 
     def stop(self):
@@ -188,7 +187,7 @@ class OMFNode(ResourceManager):
         """Clean the RM at the end of the experiment
 
         """
-        if self._omf_api :
+        if self._omf_api:
             self._omf_api.release(self.get('hostname'))
 
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
index 3cd5b54..b26a4f3 100644 (file)
@@ -193,7 +193,6 @@ class PlanetlabNode(LinuxNode):
         cls._register_attribute(min_cpu)
         cls._register_attribute(max_cpu)
         cls._register_attribute(timeframe)
-        
 
     def __init__(self, ec, guid):
         super(PlanetlabNode, self).__init__(ec, guid)
index c4867b2..991fa99 100644 (file)
@@ -165,8 +165,8 @@ class PlanetlabTap(LinuxApplication):
                 self.provision()
             except:
                 self.fail()
-                raise
+                return
+
             self.debug("----- READY ---- ")
             self.set_ready()
 
@@ -180,7 +180,6 @@ class PlanetlabTap(LinuxApplication):
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             self.fail()
-            raise RuntimeError, msg
 
     def stop(self):
         command = self.get('command') or ''
@@ -206,8 +205,7 @@ class PlanetlabTap(LinuxApplication):
 
                 if out.strip().find(self.get("deviceName")) == -1: 
                     # tap is not running is not running (socket not found)
-                    self._finish_time = tnow()
-                    self._state = ResourceState.FINISHED
+                    self.finish()
 
             self._last_state_check = tnow()
 
@@ -243,7 +241,6 @@ class PlanetlabTap(LinuxApplication):
         else:
             msg = "Couldn't retrieve if_name"
             self.error(msg, out, err)
-            self.fail()
             raise RuntimeError, msg
 
         return if_name
index 3ca20cd..6868c4a 100644 (file)
@@ -28,9 +28,6 @@ import os
 
 N_PROCS = None
 
-#THREADCACHE = []
-#THREADCACHEPID = None
-
 class WorkerThread(threading.Thread):
     class QUIT:
         pass
@@ -100,9 +97,8 @@ class WorkerThread(threading.Thread):
 class ParallelMap(object):
     def __init__(self, maxthreads = None, maxqueue = None, results = True):
         global N_PROCS
-        #global THREADCACHE
-        #global THREADCACHEPID
-        
+       
+        # Compute maximum number of threads allowed by the system
         if maxthreads is None:
             if N_PROCS is None:
                 try:
@@ -126,25 +122,18 @@ class ParallelMap(object):
             self.rvqueue = Queue.Queue()
         else:
             self.rvqueue = None
-        
-        # Check threadcache
-        #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
-        #    del THREADCACHE[:]
-        #    THREADCACHEPID = os.getpid()
     
         self.workers = []
+
+        # initialize workers
         for x in xrange(maxthreads):
             t = None
-            #if THREADCACHE:
-            #    try:
-            #        t = THREADCACHE.pop()
-            #    except:
-            #        pass
             if t is None:
                 t = WorkerThread()
                 t.setDaemon(True)
             else:
                 t.waitdone()
+
             t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
             self.workers.append(t)
     
@@ -152,13 +141,6 @@ class ParallelMap(object):
         self.destroy()
     
     def destroy(self):
-        # Check threadcache
-        #global THREADCACHE
-        #global THREADCACHEPID
-        #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
-        #    del THREADCACHE[:]
-        #    THREADCACHEPID = os.getpid()
-
         for worker in self.workers:
             worker.waitdone()
         for worker in self.workers:
@@ -168,9 +150,6 @@ class ParallelMap(object):
         for worker in self.workers:
             worker.quit()
 
-        # TO FIX:
-        # THREADCACHE.extend(self.workers)
-
         del self.workers[:]
         
     def put(self, callable, *args, **kwargs):
@@ -219,32 +198,6 @@ class ParallelMap(object):
                     except Queue.Empty:
                         raise StopIteration
             
-    
-class ParallelFilter(ParallelMap):
-    class _FILTERED:
-        pass
-    
-    def __filter(self, x):
-        if self.filter_condition(x):
-            return x
-        else:
-            return self._FILTERED
-    
-    def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
-        super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
-        self.filter_condition = filter_condition
-
-    def put(self, what):
-        super(ParallelFilter, self).put(self.__filter, what)
-    
-    def put_nowait(self, what):
-        super(ParallelFilter, self).put_nowait(self.__filter, what)
-        
-    def __iter__(self):
-        for rv in super(ParallelFilter, self).__iter__():
-            if rv is not self._FILTERED:
-                yield rv
-
 class ParallelRun(ParallelMap):
     def __run(self, x):
         fn, args, kwargs = x
@@ -260,27 +213,3 @@ class ParallelRun(ParallelMap):
         super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
 
 
-def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
-    mapper = ParallelMap(
-        maxthreads = maxthreads,
-        maxqueue = maxqueue,
-        results = True)
-    mapper.start()
-    for elem in iterable:
-        mapper.put(elem)
-    rv = list(mapper)
-    mapper.join()
-    return rv
-
-def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
-    filtrer = ParallelFilter(
-        condition,
-        maxthreads = maxthreads,
-        maxqueue = maxqueue)
-    filtrer.start()
-    for elem in iterable:
-        filtrer.put(elem)
-    rv = list(filtrer)
-    filtrer.join()
-    return rv
-