ec_shutdown
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 7 Oct 2013 06:59:47 +0000 (08:59 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 7 Oct 2013 06:59:47 +0000 (08:59 +0200)
19 files changed:
src/nepi/execution/ec.py
src/nepi/execution/resource.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccn/ccnapplication.py
src/nepi/resources/linux/ccn/ccncontent.py
src/nepi/resources/linux/ccn/ccnd.py
src/nepi/resources/linux/ccn/ccnr.py
src/nepi/resources/linux/ccn/fibentry.py
src/nepi/resources/linux/interface.py
src/nepi/resources/linux/node.py
src/nepi/resources/linux/udptest.py
src/nepi/resources/linux/udptunnel.py
src/nepi/resources/omf/application.py
src/nepi/resources/omf/channel.py
src/nepi/resources/omf/interface.py
src/nepi/resources/omf/node.py
src/nepi/resources/planetlab/node.py
src/nepi/resources/planetlab/tap.py
src/nepi/util/parallel.py

index e59e626..30dd7a4 100644 (file)
@@ -36,6 +36,51 @@ import sys
 import time
 import threading
 
 import time
 import threading
 
+class FailurePolicy(object):
+    """ Defines how to respond to experiment failures  
+    """
+    IGNORE_RM_FAILURE = 1
+    ABORT_ON_RM_FAILURE = 2
+
+class FailureLevel(object):
+    """ Describe the system failure state
+    """
+    OK = 1
+    RM_FAILURE = 2
+    TASK_FAILURE = 3
+    EC_FAILURE = 4
+
+class FailureManager(object):
+    """ The FailureManager is responsible for handling errors,
+    and deciding whether an experiment should be aborted
+    """
+
+    def __init__(self, failure_policy = None):
+        self._failure_level = FailureLevel.OK
+        self._failure_policy = failure_policy or \
+                FailurePolicy.ABORT_ON_RM_FAILURE
+
+    @property
+    def abort(self):
+        if self._failure_level == FailureLevel.EC_FAILURE:
+            return True
+
+        if self._failure_level in [FailureLevel.TASK_FAILURE, 
+                FailureLevel.RM_FAILURE] and \
+                        self._failure_policy == FailurePolicy.ABORT_ON_RM_FAILURE:
+            return True
+
+        return False
+
+    def set_rm_failure(self):
+        self._failure_level = FailureLevel.RM_FAILURE
+
+    def set_task_failure(self):
+        self._failure_level = FailureLevel.TASK_FAILURE
+
+    def set_ec_failure(self):
+        self._failure_level = FailureLevel.EC_FAILURE
+
 class ECState(object):
     """ State of the Experiment Controller
    
 class ECState(object):
     """ State of the Experiment Controller
    
@@ -49,19 +94,23 @@ class ExperimentController(object):
     .. class:: Class Args :
       
         :param exp_id: Human readable identifier for the experiment scenario. 
     .. class:: Class Args :
       
         :param exp_id: Human readable identifier for the experiment scenario. 
-                       It will be used in the name of the directory 
-                        where experiment related information is stored
         :type exp_id: str
 
     .. note::
 
         :type exp_id: str
 
     .. note::
 
-        An experiment, or scenario, is defined by a concrete use, behavior,
-        configuration and interconnection of resources that describe a single
-        experiment case (We call this the experiment description). 
-        A same experiment (scenario) can be run many times.
+        An experiment, or scenario, is defined by a concrete set of resources,
+        behavior, configuration and interconnection of those resources. 
+        The Experiment Description (ED) is a detailed representation of a
+        single experiment. It contains all the necessary information to 
+        allow repeating the experiment. NEPI allows to describe
+        experiments by registering components (resources), configuring them
+        and interconnecting them.
+        
+        A same experiment (scenario) can be executed many times, generating 
+        different results. We call an experiment execution (instance) a 'run'.
 
 
-        The ExperimentController (EC), is the entity responsible for 
-        managing an experiment instance (run). The same scenario can be 
+        The ExperimentController (EC), is the entity responsible of
+        managing an experiment run. The same scenario can be 
         recreated (and re-run) by instantiating an EC and recreating 
         the same experiment description. 
 
         recreated (and re-run) by instantiating an EC and recreating 
         the same experiment description. 
 
@@ -75,15 +124,15 @@ class ExperimentController(object):
         single resource. ResourceManagers are specific to a resource
         type (i.e. An RM to control a Linux application will not be
         the same as the RM used to control a ns-3 simulation).
         single resource. ResourceManagers are specific to a resource
         type (i.e. An RM to control a Linux application will not be
         the same as the RM used to control a ns-3 simulation).
-        In order for a new type of resource to be supported in NEPI
-        a new RM must be implemented. NEPI already provides different
+        To support a new type of resource in NEPI, a new RM must be 
+        implemented. NEPI already provides a variety of
         RMs to control basic resources, and new can be extended from
         the existing ones.
 
         Through the EC interface the user can create ResourceManagers (RMs),
         RMs to control basic resources, and new can be extended from
         the existing ones.
 
         Through the EC interface the user can create ResourceManagers (RMs),
-        configure them and interconnect them, in order to describe an experiment.
+        configure them and interconnect them, to describe an experiment.
         Describing an experiment through the EC does not run the experiment.
         Describing an experiment through the EC does not run the experiment.
-        Only when the 'deploy()' method is invoked on the EC, will the EC take 
+        Only when the 'deploy()' method is invoked on the EC, the EC will take 
         actions to transform the 'described' experiment into a 'running' experiment.
 
         While the experiment is running, it is possible to continue to
         actions to transform the 'described' experiment into a 'running' experiment.
 
         While the experiment is running, it is possible to continue to
@@ -97,8 +146,8 @@ class ExperimentController(object):
         However, since a same 'experiment' can be run many times, the experiment
         id is not enough to identify an experiment instance (run).
         For this reason, the ExperimentController has two identifier, the 
         However, since a same 'experiment' can be run many times, the experiment
         id is not enough to identify an experiment instance (run).
         For this reason, the ExperimentController has two identifier, the 
-        exp_id, which can be re-used by different ExperimentController instances,
-        and the run_id, which unique to a ExperimentController instance, and
+        exp_id, which can be re-used in different ExperimentController,
+        and the run_id, which is unique to one ExperimentController instance, and
         is automatically generated by NEPI.
         
     """
         is automatically generated by NEPI.
         
     """
@@ -108,12 +157,16 @@ class ExperimentController(object):
         # Logging
         self._logger = logging.getLogger("ExperimentController")
 
         # Logging
         self._logger = logging.getLogger("ExperimentController")
 
-        # Run identifier. It identifies a concrete instance (run) of an experiment.
-        # Since a same experiment (same configuration) can be run many times,
-        # this id permits to identify concrete exoeriment run
+        # Run identifier. It identifies a concrete execution instance (run) 
+        # of an experiment.
+        # Since a same experiment (same configuration) can be executed many 
+        # times, this run_id permits to separate result files generated on 
+        # different experiment executions
         self._run_id = tsformat()
 
         # Experiment identifier. Usually assigned by the user
         self._run_id = tsformat()
 
         # Experiment identifier. Usually assigned by the user
+        # Identifies the experiment scenario (i.e. configuration, 
+        # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
         # generator of globally unique ids
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
         # generator of globally unique ids
@@ -128,7 +181,7 @@ class ExperimentController(object):
         # Tasks
         self._tasks = dict()
 
         # Tasks
         self._tasks = dict()
 
-        # RM groups 
+        # RM groups (for deployment) 
         self._groups = dict()
 
         # generator of globally unique id for groups
         self._groups = dict()
 
         # generator of globally unique id for groups
@@ -140,6 +193,12 @@ class ExperimentController(object):
         self._thread.setDaemon(True)
         self._thread.start()
 
         self._thread.setDaemon(True)
         self._thread.start()
 
+        # Flag to stop processing thread
+        self._stop = False
+    
+        # Entity in charge of managing system failures
+        self._fm = FailureManager()
+
         # EC state
         self._state = ECState.RUNNING
 
         # EC state
         self._state = ECState.RUNNING
 
@@ -172,69 +231,86 @@ class ExperimentController(object):
         return self._run_id
 
     @property
         return self._run_id
 
     @property
-    def finished(self):
-        """ Put the state of the Experiment Controller into a final state :
-            Either TERMINATED or FAILED
+    def abort(self):
+        return self._fm.abort
 
 
-        """
-        return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+    def set_rm_failure(self):
+        self._fm.set_rm_failure()
 
     def wait_finished(self, guids):
 
     def wait_finished(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state FINISHED ( or STOPPED, FAILED or RELEASED )
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or 
+            RELEASED ) or until a System Failure occurs (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
 
         :param guids: List of guids
         :type guids: list
+
         """
         """
-        return self.wait(guids)
+
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.STOPPED, 
+                quit = quit)
 
     def wait_started(self, guids):
 
     def wait_started(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED)
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= STARTED or until a System Failure occurs 
+            (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
         """
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, state = ResourceState.STARTED)
+
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.STARTED, 
+                quit = quit)
 
     def wait_released(self, guids):
 
     def wait_released(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state RELEASED (or FAILED)
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state = RELEASED or until the EC fails
 
         :param guids: List of guids
         :type guids: list
         """
 
         :param guids: List of guids
         :type guids: list
         """
-        # TODO: solve state concurrency BUG and !!!!
-        # correct waited release state to state = ResourceState.FAILED)
-        return self.wait(guids, state = ResourceState.FINISHED)
+
+        def quit():
+            return self._state == ECState.FAILED
+
+        return self.wait(guids, state = ResourceState.RELEASED, 
+                quit = quit)
 
     def wait_deployed(self, guids):
 
     def wait_deployed(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state READY (or any higher state)
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= READY or until a System Failure occurs 
+            (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
         """
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, state = ResourceState.READY)
 
 
-    def wait(self, guids, state = ResourceState.STOPPED):
-        """ Blocking method that waits until all the RM from the 'guid' list 
-            reached state 'state' or until a failure occurs
-            
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.READY, 
+                quit = quit)
+
+    def wait(self, guids, state, quit):
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= 'state' or until quit yileds True
+           
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
-        # we randomly alter the order of the guids to avoid ordering
-        # dependencies (e.g. LinuxApplication RMs runing on the same
-        # linux host will be synchronized by the LinuxNode SSH lock)
-        random.shuffle(guids)
-
         while True:
         while True:
-            # If no more guids to wait for or an error occured, then exit
-            if len(guids) == 0 or self.finished:
+            # If there are no more guids to wait for
+            # or the quit function returns True, exit the loop
+            if len(guids) == 0 or quit():
                 break
 
             # If a guid reached one of the target states, remove it from list
                 break
 
             # If a guid reached one of the target states, remove it from list
@@ -245,26 +321,11 @@ class ExperimentController(object):
                 guids.remove(guid)
             else:
                 # Debug...
                 guids.remove(guid)
             else:
                 # Debug...
-                self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid,
-                    self.state(guid, hr = True), state))
-
-                # Take the opportunity to 'refresh' the states of the RMs.
-                # Query only the first up to N guids (not to overwhelm 
-                # the local machine)
-                n = 100
-                lim = n if len(guids) > n else ( len(guids) -1 )
-                nguids = guids[0: lim]
-
-                # schedule state request for all guids (take advantage of
-                # scheduler multi threading).
-                for guid in nguids:
-                    callback = functools.partial(self.state, guid)
-                    self.schedule("0s", callback)
-
-                # If the guid is not in one of the target states, wait and
-                # continue quering. We keep the sleep big to decrease the
-                # number of RM state queries
-                time.sleep(4)
+                hrstate = ResourceState2str.get(rstate)
+                self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
+                    guid, rstate, state))
+
+            time.sleep(0.5)
   
     def get_task(self, tid):
         """ Get a specific task
   
     def get_task(self, tid):
         """ Get a specific task
@@ -574,7 +635,7 @@ class ExperimentController(object):
         self.logger.debug(" ------- DEPLOY START ------ ")
 
         if not guids:
         self.logger.debug(" ------- DEPLOY START ------ ")
 
         if not guids:
-            # If no guids list was indicated, all 'NEW' RMs will be deployed
+            # If no guids list was passed, all 'NEW' RMs will be deployed
             guids = []
             for guid in self.resources:
                 if self.state(guid) == ResourceState.NEW:
             guids = []
             for guid in self.resources:
                 if self.state(guid) == ResourceState.NEW:
@@ -584,6 +645,7 @@ class ExperimentController(object):
             guids = [guids]
 
         # Create deployment group
             guids = [guids]
 
         # Create deployment group
+        # New guids can be added to a same deployment group later on
         new_group = False
         if not group:
             new_group = True
         new_group = False
         if not group:
             new_group = True
@@ -594,20 +656,9 @@ class ExperimentController(object):
 
         self._groups[group].extend(guids)
 
 
         self._groups[group].extend(guids)
 
-        # Before starting deployment we disorder the guids list with the
-        # purpose of speeding up the whole deployment process.
-        # It is likely that the user inserted in the 'guids' list closely
-        # resources one after another (e.g. all applications
-        # connected to the same node can likely appear one after another).
-        # This can originate a slow down in the deployment since the N 
-        # threads the parallel runner uses to processes tasks may all
-        # be taken up by the same family of resources waiting for the 
-        # same conditions (e.g. LinuxApplications running on a same 
-        # node share a single lock, so they will tend to be serialized).
-        # If we disorder the guids list, this problem can be mitigated.
-        random.shuffle(guids)
-
         def wait_all_and_start(group):
         def wait_all_and_start(group):
+            # Function that checks if all resources are READY
+            # before scheduling a start_with_conditions for each RM
             reschedule = False
             
             # Get all guids in group
             reschedule = False
             
             # Get all guids in group
@@ -630,9 +681,9 @@ class ExperimentController(object):
         if wait_all_ready and new_group:
             # Schedule a function to check that all resources are
             # READY, and only then schedule the start.
         if wait_all_ready and new_group:
             # Schedule a function to check that all resources are
             # READY, and only then schedule the start.
-            # This aimes at reducing the number of tasks looping in the 
+            # This aims at reducing the number of tasks looping in the 
             # scheduler. 
             # scheduler. 
-            # Intead of having N start tasks, we will have only one for 
+            # Instead of having many start tasks, we will have only one for 
             # the whole group.
             callback = functools.partial(wait_all_and_start, group)
             self.schedule("1s", callback)
             # the whole group.
             callback = functools.partial(wait_all_and_start, group)
             self.schedule("1s", callback)
@@ -672,11 +723,17 @@ class ExperimentController(object):
         Releases all the resources and stops task processing thread
 
         """
         Releases all the resources and stops task processing thread
 
         """
+        # TODO: Clean the parallel runner!! STOP all ongoing tasks
+        ####
+
         self.release()
 
         # Mark the EC state as TERMINATED
         self._state = ECState.TERMINATED
 
         self.release()
 
         # Mark the EC state as TERMINATED
         self._state = ECState.TERMINATED
 
+        # Stop processing thread
+        self._stop = True
+
         # Notify condition to wake up the processing thread
         self._notify()
         
         # Notify condition to wake up the processing thread
         self._notify()
         
@@ -754,8 +811,8 @@ class ExperimentController(object):
         runner = ParallelRun(maxthreads = nthreads)
         runner.start()
 
         runner = ParallelRun(maxthreads = nthreads)
         runner.start()
 
-        try:
-            while not self.finished:
+        while not self._stop:
+            try:
                 self._cond.acquire()
 
                 task = self._scheduler.next()
                 self._cond.acquire()
 
                 task = self._scheduler.next()
@@ -784,16 +841,20 @@ class ExperimentController(object):
                 if task:
                     # Process tasks in parallel
                     runner.put(self._execute, task)
                 if task:
                     # Process tasks in parallel
                     runner.put(self._execute, task)
-        except: 
-            import traceback
-            err = traceback.format_exc()
-            self.logger.error("Error while processing tasks in the EC: %s" % err)
+            except: 
+                import traceback
+                err = traceback.format_exc()
+                self.logger.error("Error while processing tasks in the EC: %s" % err)
 
 
-            self._state = ECState.FAILED
-        finally:   
-            self.logger.debug("Exiting the task processing loop ... ")
-            runner.sync()
-            runner.destroy()
+                # Set the EC to FAILED state 
+                self._state = ECState.FAILED
+
+                # Set the FailureManager failure level
+                self._fm.set_ec_failure()
+
+        self.logger.debug("Exiting the task processing loop ... ")
+        runner.sync()
+        runner.destroy()
 
     def _execute(self, task):
         """ Executes a single task. 
 
     def _execute(self, task):
         """ Executes a single task. 
@@ -821,15 +882,8 @@ class ExperimentController(object):
             
             self.logger.error("Error occurred while executing task: %s" % err)
 
             
             self.logger.error("Error occurred while executing task: %s" % err)
 
-            # Set the EC to FAILED state (this will force to exit the task
-            # processing thread)
-            self._state = ECState.FAILED
-
-            # Notify condition to wake up the processing thread
-            self._notify()
-
-            # Propage error to the ParallelRunner
-            raise
+            # Set the FailureManager failure level
+            self._fm.set_task_failure()
 
     def _notify(self):
         """ Awakes the processing thread in case it is blocked waiting
 
     def _notify(self):
         """ Awakes the processing thread in case it is blocked waiting
index a9087ae..18edeb0 100644 (file)
@@ -403,6 +403,7 @@ class ResourceManager(Logger):
  
     def fail(self):
         self.set_failed()
  
     def fail(self):
         self.set_failed()
+        self.ec.set_rm_failure()
 
     def set(self, name, value):
         """ Set the value of the attribute
 
     def set(self, name, value):
         """ Set the value of the attribute
@@ -741,7 +742,6 @@ class ResourceManager(Logger):
             self.debug("----- STARTING ---- ")
             self.deploy()
 
             self.debug("----- STARTING ---- ")
             self.deploy()
 
-
     def connect(self, guid):
         """ Performs actions that need to be taken upon associating RMs.
         This method should be redefined when necessary in child classes.
     def connect(self, guid):
         """ Performs actions that need to be taken upon associating RMs.
         This method should be redefined when necessary in child classes.
index 4f4b64e..763106c 100644 (file)
@@ -301,7 +301,7 @@ class LinuxApplication(ResourceManager):
         # Since provisioning takes a long time, before
         # each step we check that the EC is still 
         for step in steps:
         # Since provisioning takes a long time, before
         # each step we check that the EC is still 
         for step in steps:
-            if self.ec.finished:
+            if self.ec.abort:
                 raise RuntimeError, "EC finished"
             
             ret = step()
                 raise RuntimeError, "EC finished"
             
             ret = step()
@@ -484,7 +484,7 @@ class LinuxApplication(ResourceManager):
                 self.provision()
             except:
                 self.fail()
                 self.provision()
             except:
                 self.fail()
-                raise
+                return
 
             super(LinuxApplication, self).deploy()
     
 
             super(LinuxApplication, self).deploy()
     
@@ -499,10 +499,14 @@ class LinuxApplication(ResourceManager):
             self.set_finished()
         else:
 
             self.set_finished()
         else:
 
-            if self.in_foreground:
-                self._run_in_foreground()
-            else:
-                self._run_in_background()
+            try:
+                if self.in_foreground:
+                    self._run_in_foreground()
+                else:
+                    self._run_in_background()
+            except:
+                self.fail()
+                return
 
             super(LinuxApplication, self).start()
 
 
             super(LinuxApplication, self).start()
 
@@ -530,7 +534,6 @@ class LinuxApplication(ResourceManager):
                 blocking = False)
 
         if self._proc.poll():
                 blocking = False)
 
         if self._proc.poll():
-            self.fail()
             self.error(msg, out, err)
             raise RuntimeError, msg
 
             self.error(msg, out, err)
             raise RuntimeError, msg
 
@@ -560,7 +563,6 @@ class LinuxApplication(ResourceManager):
         msg = " Failed to start command '%s' " % command
         
         if proc.poll():
         msg = " Failed to start command '%s' " % command
         
         if proc.poll():
-            self.fail()
             self.error(msg, out, err)
             raise RuntimeError, msg
     
             self.error(msg, out, err)
             raise RuntimeError, msg
     
@@ -577,7 +579,6 @@ class LinuxApplication(ResourceManager):
 
             # Out is what was written in the stderr file
             if err:
 
             # Out is what was written in the stderr file
             if err:
-                self.fail()
                 msg = " Failed to start command '%s' " % command
                 self.error(msg, out, err)
                 raise RuntimeError, msg
                 msg = " Failed to start command '%s' " % command
                 self.error(msg, out, err)
                 raise RuntimeError, msg
@@ -609,8 +610,8 @@ class LinuxApplication(ResourceManager):
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
                         self.fail()
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
                         self.fail()
+                        return
         
         
-        if self.state == ResourceState.STARTED:
             super(LinuxApplication, self).stop()
 
     def release(self):
             super(LinuxApplication, self).stop()
 
     def release(self):
@@ -622,10 +623,7 @@ class LinuxApplication(ResourceManager):
 
         self.stop()
 
 
         self.stop()
 
-        if self.state != ResourceState.FAILED:
-            self.info("Resource released")
-
-            super(LinuxApplication, self).release()
+        super(LinuxApplication, self).release()
    
     @property
     def state(self):
    
     @property
     def state(self):
index 3d8943a..46a3cc4 100644 (file)
@@ -61,7 +61,7 @@ class LinuxCCNApplication(LinuxApplication):
                 self.provision()
             except:
                 self.fail()
                 self.provision()
             except:
                 self.fail()
-                raise
+                return
  
             self.debug("----- READY ---- ")
             self.set_ready()
  
             self.debug("----- READY ---- ")
             self.set_ready()
index cae55b5..1fa93cc 100644 (file)
@@ -98,8 +98,8 @@ class LinuxCCNContent(LinuxApplication):
                 self.provision()
             except:
                 self.fail()
                 self.provision()
             except:
                 self.fail()
-                raise
+                return 
+
             self.debug("----- READY ---- ")
             self.set_ready()
 
             self.debug("----- READY ---- ")
             self.set_ready()
 
@@ -121,7 +121,6 @@ class LinuxCCNContent(LinuxApplication):
                 env, blocking = True)
 
         if proc.poll():
                 env, blocking = True)
 
         if proc.poll():
-            self.fail()
             msg = "Failed to execute command"
             self.error(msg, out, err)
             raise RuntimeError, msg
             msg = "Failed to execute command"
             self.error(msg, out, err)
             raise RuntimeError, msg
@@ -136,7 +135,6 @@ class LinuxCCNContent(LinuxApplication):
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             sef.fail()
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             sef.fail()
-            raise RuntimeError, msg
 
     @property
     def _start_command(self):
 
     @property
     def _start_command(self):
index 4fdb0ce..71eb12a 100644 (file)
@@ -178,8 +178,8 @@ class LinuxCCND(LinuxApplication):
                 self.provision()
             except:
                 self.fail()
                 self.provision()
             except:
                 self.fail()
-                raise
+                return
+
             self.debug("----- READY ---- ")
             self.set_ready()
 
             self.debug("----- READY ---- ")
             self.set_ready()
 
@@ -211,8 +211,7 @@ class LinuxCCND(LinuxApplication):
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
         else:
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
-            self.set_failed()
-            raise RuntimeError, msg
+            self.fail()
 
     def stop(self):
         command = self.get('command') or ''
 
     def stop(self):
         command = self.get('command') or ''
index 378f93c..46c3f3b 100644 (file)
@@ -222,8 +222,8 @@ class LinuxCCNR(LinuxApplication):
                 self.provision()
             except:
                 self.fail()
                 self.provision()
             except:
                 self.fail()
-                raise
+                return 
+
             self.debug("----- READY ---- ")
             self.set_ready()
 
             self.debug("----- READY ---- ")
             self.set_ready()
 
@@ -265,7 +265,6 @@ class LinuxCCNR(LinuxApplication):
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             self.fail()
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             self.fail()
-            raise RuntimeError, msg
 
     @property
     def _start_command(self):
 
     @property
     def _start_command(self):
index aad03ca..48c2d6d 100644 (file)
@@ -136,8 +136,8 @@ class LinuxFIBEntry(LinuxApplication):
                 self.configure()
             except:
                 self.fail()
                 self.configure()
             except:
                 self.fail()
-                raise
+                return
+
             self.debug("----- READY ---- ")
             self.set_ready()
 
             self.debug("----- READY ---- ")
             self.set_ready()
 
@@ -161,7 +161,6 @@ class LinuxFIBEntry(LinuxApplication):
         if proc.poll():
             msg = "Failed to execute command"
             self.error(msg, out, err)
         if proc.poll():
             msg = "Failed to execute command"
             self.error(msg, out, err)
-            self.fail()
             raise RuntimeError, msg
         
     def configure(self):
             raise RuntimeError, msg
         
     def configure(self):
@@ -205,7 +204,6 @@ class LinuxFIBEntry(LinuxApplication):
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             self.fail()
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             self.fail()
-            raise RuntimeError, msg
 
     def stop(self):
         command = self.get('command')
 
     def stop(self):
         command = self.get('command')
@@ -224,7 +222,6 @@ class LinuxFIBEntry(LinuxApplication):
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
                 self.fail()
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
                 self.fail()
-                raise RuntimeError, msg
 
     @property
     def _start_command(self):
 
     @property
     def _start_command(self):
index 26a1549..a170f41 100644 (file)
@@ -243,7 +243,7 @@ class LinuxInterface(ResourceManager):
                 self.provision()
             except:
                 self.fail()
                 self.provision()
             except:
                 self.fail()
-                raise
+                return 
 
             super(LinuxInterface, self).deploy()
 
 
             super(LinuxInterface, self).deploy()
 
index b50d7fa..9cab4ca 100644 (file)
@@ -326,7 +326,6 @@ class LinuxNode(ResourceManager):
     def provision(self):
         # check if host is alive
         if not self.is_alive():
     def provision(self):
         # check if host is alive
         if not self.is_alive():
-            self.fail()
             
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
             self.error(msg)
             
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
             self.error(msg)
@@ -361,7 +360,7 @@ class LinuxNode(ResourceManager):
                 self.provision()
             except:
                 self.fail()
                 self.provision()
             except:
                 self.fail()
-                raise
+                return
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
index 779b8c1..1b7ac9c 100644 (file)
@@ -259,7 +259,6 @@ class LinuxUdpTest(LinuxApplication):
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
                 self.fail()
                 msg = " Failed to execute command '%s'" % command
                 self.error(msg, out, err)
                 self.fail()
-                raise RuntimeError, msg
         else:
             super(LinuxUdpTest, self).start()
  
         else:
             super(LinuxUdpTest, self).start()
  
index 82f0139..4dae96c 100644 (file)
@@ -141,7 +141,6 @@ class UdpTunnel(LinuxApplication):
         msg = " Failed to connect endpoints "
         
         if proc.poll():
         msg = " Failed to connect endpoints "
         
         if proc.poll():
-            self.fail()
             self.error(msg, out, err)
             raise RuntimeError, msg
     
             self.error(msg, out, err)
             raise RuntimeError, msg
     
@@ -154,7 +153,6 @@ class UdpTunnel(LinuxApplication):
             (out, err), proc = endpoint.node.check_errors(self.run_home(endpoint))
             # Out is what was written in the stderr file
             if err:
             (out, err), proc = endpoint.node.check_errors(self.run_home(endpoint))
             # Out is what was written in the stderr file
             if err:
-                self.fail()
                 msg = " Failed to start command '%s' " % command
                 self.error(msg, out, err)
                 raise RuntimeError, msg
                 msg = " Failed to start command '%s' " % command
                 self.error(msg, out, err)
                 raise RuntimeError, msg
@@ -202,7 +200,7 @@ class UdpTunnel(LinuxApplication):
                 self.provision()
             except:
                 self.fail()
                 self.provision()
             except:
                 self.fail()
-                raise
+                return
  
             self.debug("----- READY ---- ")
             self.set_ready()
  
             self.debug("----- READY ---- ")
             self.set_ready()
@@ -217,7 +215,6 @@ class UdpTunnel(LinuxApplication):
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             self.fail()
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             self.fail()
-            raise RuntimeError, msg
 
     def stop(self):
         """ Stops application execution
 
     def stop(self):
         """ Stops application execution
@@ -238,8 +235,8 @@ class UdpTunnel(LinuxApplication):
                     msg = " Failed to STOP tunnel"
                     self.error(msg, err1, err2)
                     self.fail()
                     msg = " Failed to STOP tunnel"
                     self.error(msg, err1, err2)
                     self.fail()
+                    return
 
 
-        if self.state == ResourceState.STARTED:
             self.set_stopped()
 
     @property
             self.set_stopped()
 
     @property
@@ -311,7 +308,6 @@ class UdpTunnel(LinuxApplication):
         else:
             msg = "Couldn't retrieve %s" % filename
             self.error(msg, out, err)
         else:
             msg = "Couldn't retrieve %s" % filename
             self.error(msg, out, err)
-            self.fail()
             raise RuntimeError, msg
 
         return result
             raise RuntimeError, msg
 
         return result
index 673f810..0bc0c62 100644 (file)
@@ -195,7 +195,7 @@ class OMFApplication(ResourceManager):
             msg = "Credentials were not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
             msg = "Credentials were not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
-            #raise
+            return
 
         super(OMFApplication, self).stop()
 
 
         super(OMFApplication, self).stop()
 
index b51cd89..ccd67be 100644 (file)
@@ -163,7 +163,7 @@ class OMFChannel(ResourceManager):
             msg = "Channel's value is not initialized"
             self.error(msg)
             self.fail()
             msg = "Channel's value is not initialized"
             self.error(msg)
             self.fail()
-            raise
+            return
 
         self._nodes_guid = self._get_target(self._connections) 
         if self._nodes_guid == "reschedule" :
 
         self._nodes_guid = self._get_target(self._connections) 
         if self._nodes_guid == "reschedule" :
@@ -180,7 +180,7 @@ class OMFChannel(ResourceManager):
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
-            raise
+            return
 
         super(OMFChannel, self).deploy()
 
 
         super(OMFChannel, self).deploy()
 
index a3b9c3a..1fb1762 100644 (file)
@@ -102,7 +102,6 @@ class OMFWifiInterface(ResourceManager):
             msg = "Connection between %s %s and %s %s accepted" % \
                 (self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
             msg = "Connection between %s %s and %s %s accepted" % \
                 (self.rtype(), self._guid, rm.rtype(), guid)
             self.debug(msg)
-
             return True
 
         msg = "Connection between %s %s and %s %s refused" % \
             return True
 
         msg = "Connection between %s %s and %s %s refused" % \
@@ -123,7 +122,6 @@ class OMFWifiInterface(ResourceManager):
         if rm_list: return rm_list[0]
         return None
 
         if rm_list: return rm_list[0]
         return None
 
-
     def configure_iface(self):
         """ Configure the interface without the ip
 
     def configure_iface(self):
         """ Configure the interface without the ip
 
@@ -139,13 +137,11 @@ class OMFWifiInterface(ResourceManager):
                 self._omf_api.configure(self.node.get('hostname'), attrname, 
                         attrval)
         except AttributeError:
                 self._omf_api.configure(self.node.get('hostname'), attrname, 
                         attrval)
         except AttributeError:
-            self._state = ResourceState.FAILED
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             msg = "Credentials are not initialzed. XMPP Connections impossible"
-            self.debug(msg)
-            #raise
+            self.error(msg)
+            raise
         
         super(OMFWifiInterface, self).provision()
         
         super(OMFWifiInterface, self).provision()
-        return True
 
     def configure_ip(self):
         """ Configure the ip of the interface
 
     def configure_ip(self):
         """ Configure the ip of the interface
@@ -162,23 +158,21 @@ class OMFWifiInterface(ResourceManager):
                     attrval)
         except AttributeError:
             msg = "Credentials are not initialzed. XMPP Connections impossible"
                     attrval)
         except AttributeError:
             msg = "Credentials are not initialzed. XMPP Connections impossible"
-            self.debug(msg)
-            self.fail()
-            #raise
+            self.error(msg)
+            raise
 
 
-        return True
 
     def deploy(self):
         """ Deploy the RM. It means : Get the xmpp client and send messages 
         using OMF 5.4 protocol to configure the interface.
         It becomes DEPLOYED after sending messages to configure the interface
         """
 
     def deploy(self):
         """ Deploy the RM. It means : Get the xmpp client and send messages 
         using OMF 5.4 protocol to configure the interface.
         It becomes DEPLOYED after sending messages to configure the interface
         """
-        if not self._omf_api :
+        if not self._omf_api:
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
                 self.get('xmppHost'), self.get('xmppPort'), 
                 self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
                 self.get('xmppHost'), self.get('xmppPort'), 
                 self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
-        if not self._omf_api :
+        if not self._omf_api:
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
@@ -189,13 +183,13 @@ class OMFWifiInterface(ResourceManager):
             msg = "Interface's variable are not initialized"
             self.error(msg)
             self.fail()
             msg = "Interface's variable are not initialized"
             self.error(msg)
             self.fail()
-            return False
+            return
 
         if not self.node.get('hostname') :
             msg = "The channel is connected with an undefined node"
             self.error(msg)
             self.fail()
 
         if not self.node.get('hostname') :
             msg = "The channel is connected with an undefined node"
             self.error(msg)
             self.fail()
-            return False
+            return
 
         # Just for information
         self.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + \
 
         # Just for information
         self.debug(" " + self.rtype() + " ( Guid : " + str(self._guid) +") : " + \
@@ -203,23 +197,21 @@ class OMFWifiInterface(ResourceManager):
             self.get('essid') + " : " + self.get('ip'))
     
         # Check if the node is already deployed
             self.get('essid') + " : " + self.get('ip'))
     
         # Check if the node is already deployed
-        chk1 = True
-        if self.state < ResourceState.PROVISIONED:
-            chk1 = self.configure_iface()
-        if chk1:
-            chk2 = self.configure_ip()
+        try:
+            if self.state < ResourceState.PROVISIONED:
+                if self.configure_iface():
+                    self.configure_ip()
+        except:
+            self.fail()
+            return
 
 
-        if not (chk1 and chk2) :
-            return False
-            
         super(OMFWifiInterface, self).deploy()
         super(OMFWifiInterface, self).deploy()
-        return True
 
     def release(self):
         """ Clean the RM at the end of the experiment and release the API
 
         """
 
     def release(self):
         """ Clean the RM at the end of the experiment and release the API
 
         """
-        if self._omf_api :
+        if self._omf_api:
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
                 self.get('xmppHost'), self.get('xmppPort'), 
                 self.get('xmppPassword'), exp_id = self.ec.exp_id)
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
                 self.get('xmppHost'), self.get('xmppPort'), 
                 self.get('xmppPassword'), exp_id = self.ec.exp_id)
index 1421078..570afd0 100644 (file)
@@ -130,22 +130,22 @@ class OMFNode(ResourceManager):
             It becomes DEPLOYED after sending messages to enroll the node
 
         """ 
             It becomes DEPLOYED after sending messages to enroll the node
 
         """ 
-        if not self._omf_api :
+        if not self._omf_api:
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
                 self.get('xmppHost'), self.get('xmppPort'), 
                 self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
                 self.get('xmppHost'), self.get('xmppPort'), 
                 self.get('xmppPassword'), exp_id = self.ec.exp_id)
 
-        if not self._omf_api :
+        if not self._omf_api:
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
             return
 
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
             return
 
-        if not self.get('hostname') :
+        if not self.get('hostname'):
             msg = "Hostname's value is not initialized"
             self.error(msg)
             self.fail()
             msg = "Hostname's value is not initialized"
             self.error(msg)
             self.fail()
-            return False
+            return
 
         try:
             self._omf_api.enroll_host(self.get('hostname'))
 
         try:
             self._omf_api.enroll_host(self.get('hostname'))
@@ -153,7 +153,7 @@ class OMFNode(ResourceManager):
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
             msg = "Credentials are not initialzed. XMPP Connections impossible"
             self.error(msg)
             self.fail()
-            #raise AttributeError, msg
+            return
 
         super(OMFNode, self).deploy()
 
 
         super(OMFNode, self).deploy()
 
@@ -174,7 +174,6 @@ class OMFNode(ResourceManager):
            It becomes STARTED as soon as this method starts.
 
         """
            It becomes STARTED as soon as this method starts.
 
         """
-
         super(OMFNode, self).start()
 
     def stop(self):
         super(OMFNode, self).start()
 
     def stop(self):
@@ -188,7 +187,7 @@ class OMFNode(ResourceManager):
         """Clean the RM at the end of the experiment
 
         """
         """Clean the RM at the end of the experiment
 
         """
-        if self._omf_api :
+        if self._omf_api:
             self._omf_api.release(self.get('hostname'))
 
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
             self._omf_api.release(self.get('hostname'))
 
             OMFAPIFactory.release_api(self.get('xmppSlice'), 
index 3cd5b54..b26a4f3 100644 (file)
@@ -193,7 +193,6 @@ class PlanetlabNode(LinuxNode):
         cls._register_attribute(min_cpu)
         cls._register_attribute(max_cpu)
         cls._register_attribute(timeframe)
         cls._register_attribute(min_cpu)
         cls._register_attribute(max_cpu)
         cls._register_attribute(timeframe)
-        
 
     def __init__(self, ec, guid):
         super(PlanetlabNode, self).__init__(ec, guid)
 
     def __init__(self, ec, guid):
         super(PlanetlabNode, self).__init__(ec, guid)
index c4867b2..991fa99 100644 (file)
@@ -165,8 +165,8 @@ class PlanetlabTap(LinuxApplication):
                 self.provision()
             except:
                 self.fail()
                 self.provision()
             except:
                 self.fail()
-                raise
+                return
+
             self.debug("----- READY ---- ")
             self.set_ready()
 
             self.debug("----- READY ---- ")
             self.set_ready()
 
@@ -180,7 +180,6 @@ class PlanetlabTap(LinuxApplication):
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             self.fail()
             msg = " Failed to execute command '%s'" % command
             self.error(msg, out, err)
             self.fail()
-            raise RuntimeError, msg
 
     def stop(self):
         command = self.get('command') or ''
 
     def stop(self):
         command = self.get('command') or ''
@@ -206,8 +205,7 @@ class PlanetlabTap(LinuxApplication):
 
                 if out.strip().find(self.get("deviceName")) == -1: 
                     # tap is not running is not running (socket not found)
 
                 if out.strip().find(self.get("deviceName")) == -1: 
                     # tap is not running is not running (socket not found)
-                    self._finish_time = tnow()
-                    self._state = ResourceState.FINISHED
+                    self.finish()
 
             self._last_state_check = tnow()
 
 
             self._last_state_check = tnow()
 
@@ -243,7 +241,6 @@ class PlanetlabTap(LinuxApplication):
         else:
             msg = "Couldn't retrieve if_name"
             self.error(msg, out, err)
         else:
             msg = "Couldn't retrieve if_name"
             self.error(msg, out, err)
-            self.fail()
             raise RuntimeError, msg
 
         return if_name
             raise RuntimeError, msg
 
         return if_name
index 3ca20cd..6868c4a 100644 (file)
@@ -28,9 +28,6 @@ import os
 
 N_PROCS = None
 
 
 N_PROCS = None
 
-#THREADCACHE = []
-#THREADCACHEPID = None
-
 class WorkerThread(threading.Thread):
     class QUIT:
         pass
 class WorkerThread(threading.Thread):
     class QUIT:
         pass
@@ -100,9 +97,8 @@ class WorkerThread(threading.Thread):
 class ParallelMap(object):
     def __init__(self, maxthreads = None, maxqueue = None, results = True):
         global N_PROCS
 class ParallelMap(object):
     def __init__(self, maxthreads = None, maxqueue = None, results = True):
         global N_PROCS
-        #global THREADCACHE
-        #global THREADCACHEPID
-        
+       
+        # Compute maximum number of threads allowed by the system
         if maxthreads is None:
             if N_PROCS is None:
                 try:
         if maxthreads is None:
             if N_PROCS is None:
                 try:
@@ -126,25 +122,18 @@ class ParallelMap(object):
             self.rvqueue = Queue.Queue()
         else:
             self.rvqueue = None
             self.rvqueue = Queue.Queue()
         else:
             self.rvqueue = None
-        
-        # Check threadcache
-        #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
-        #    del THREADCACHE[:]
-        #    THREADCACHEPID = os.getpid()
     
         self.workers = []
     
         self.workers = []
+
+        # initialize workers
         for x in xrange(maxthreads):
             t = None
         for x in xrange(maxthreads):
             t = None
-            #if THREADCACHE:
-            #    try:
-            #        t = THREADCACHE.pop()
-            #    except:
-            #        pass
             if t is None:
                 t = WorkerThread()
                 t.setDaemon(True)
             else:
                 t.waitdone()
             if t is None:
                 t = WorkerThread()
                 t.setDaemon(True)
             else:
                 t.waitdone()
+
             t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
             self.workers.append(t)
     
             t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
             self.workers.append(t)
     
@@ -152,13 +141,6 @@ class ParallelMap(object):
         self.destroy()
     
     def destroy(self):
         self.destroy()
     
     def destroy(self):
-        # Check threadcache
-        #global THREADCACHE
-        #global THREADCACHEPID
-        #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
-        #    del THREADCACHE[:]
-        #    THREADCACHEPID = os.getpid()
-
         for worker in self.workers:
             worker.waitdone()
         for worker in self.workers:
         for worker in self.workers:
             worker.waitdone()
         for worker in self.workers:
@@ -168,9 +150,6 @@ class ParallelMap(object):
         for worker in self.workers:
             worker.quit()
 
         for worker in self.workers:
             worker.quit()
 
-        # TO FIX:
-        # THREADCACHE.extend(self.workers)
-
         del self.workers[:]
         
     def put(self, callable, *args, **kwargs):
         del self.workers[:]
         
     def put(self, callable, *args, **kwargs):
@@ -219,32 +198,6 @@ class ParallelMap(object):
                     except Queue.Empty:
                         raise StopIteration
             
                     except Queue.Empty:
                         raise StopIteration
             
-    
-class ParallelFilter(ParallelMap):
-    class _FILTERED:
-        pass
-    
-    def __filter(self, x):
-        if self.filter_condition(x):
-            return x
-        else:
-            return self._FILTERED
-    
-    def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
-        super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
-        self.filter_condition = filter_condition
-
-    def put(self, what):
-        super(ParallelFilter, self).put(self.__filter, what)
-    
-    def put_nowait(self, what):
-        super(ParallelFilter, self).put_nowait(self.__filter, what)
-        
-    def __iter__(self):
-        for rv in super(ParallelFilter, self).__iter__():
-            if rv is not self._FILTERED:
-                yield rv
-
 class ParallelRun(ParallelMap):
     def __run(self, x):
         fn, args, kwargs = x
 class ParallelRun(ParallelMap):
     def __run(self, x):
         fn, args, kwargs = x
@@ -260,27 +213,3 @@ class ParallelRun(ParallelMap):
         super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
 
 
         super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
 
 
-def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
-    mapper = ParallelMap(
-        maxthreads = maxthreads,
-        maxqueue = maxqueue,
-        results = True)
-    mapper.start()
-    for elem in iterable:
-        mapper.put(elem)
-    rv = list(mapper)
-    mapper.join()
-    return rv
-
-def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
-    filtrer = ParallelFilter(
-        condition,
-        maxthreads = maxthreads,
-        maxqueue = maxqueue)
-    filtrer.start()
-    for elem in iterable:
-        filtrer.put(elem)
-    rv = list(filtrer)
-    filtrer.join()
-    return rv
-