Ticket #11: parallel execution
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Wed, 13 Apr 2011 08:56:14 +0000 (10:56 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Wed, 13 Apr 2011 08:56:14 +0000 (10:56 +0200)
src/nepi/core/execute.py

index dea3866..0472307 100644 (file)
@@ -7,6 +7,7 @@ from nepi.util.constants import STATUS_FINISHED, TIME_NOW
 from nepi.util.parser._xml import XmlExperimentParser
 import sys
 import re
+import threading
 
 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
@@ -303,20 +304,43 @@ class ExperimentController(object):
     def trace(self, testbed_guid, guid, trace_id):
         return self._testbeds[testbed_guid].trace(guid, trace_id)
 
+    @staticmethod
+    def _parallel(callables):
+        threads = [ threading.Thread(target=callable) for callable in callables ]
+        for thread in threads:
+            thread.start()
+        for thread in threads:
+            thread.join()
+
     def start(self):
         self._create_testbed_instances()
-        for testbed in self._testbeds.values():
-            testbed.do_setup()
-        for testbed in self._testbeds.values():
-            testbed.do_create()
-            testbed.do_connect()
+        
+        # perform setup in parallel for all test beds,
+        # wait for all threads to finish
+        self._parallel([testbed.do_setup 
+                        for testbed in self._testbeds.itervalues()])
+        
+        # perform create-connect in parallel, wait
+        # (internal connections only)
+        self._parallel([lambda : (testbed.do_create(), 
+                                  testbed.do_connect())
+                        for testbed in self._testbeds.itervalues()])
+        
+        # resolve netrefs
         self.do_netrefs(fail_if_undefined=True)
-        for testbed in self._testbeds.values():
-            testbed.do_configure()
+        
+        # perform do_configure in parallel for al testbeds
+        # (it's internal configuration for each)
+        self._parallel([testbed.do_configure
+                        for testbed in self._testbeds.itervalues()])
+
+        # cross-connect (cannot be done in parallel)
         for testbed in self._testbeds.values():
             testbed.do_cross_connect()
-        for testbed in self._testbeds.values():
-            testbed.start()
+        
+        # start experiment (parallel start on all testbeds)
+        self._parallel([testbed.start
+                        for testbed in self._testbeds.itervalues()])
 
     def stop(self):
        for testbed in self._testbeds.values():