Fix shutdown order to respect creation order (important when running nepi-in-nepi)
[nepi.git] / src / nepi / core / execute.py
index a729ba6..cf32e6a 100644 (file)
@@ -13,8 +13,10 @@ import os
 import collections
 import functools
 import time
+import logging
+logging.basicConfig()
 
-ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
+ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
 
@@ -239,7 +241,15 @@ class ExperimentController(object):
         self._failed_testbeds = set()
         self._started_time = None
         self._stopped_time = None
-        
+        self._testbed_order = []
+      
+        self._logger = logging.getLogger('nepi.core.execute')
+        level = logging.ERROR
+        if os.environ.get("NEPI_CONTROLLER_LOGLEVEL", 
+                DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
+            level = logging.DEBUG
+        self._logger.setLevel(level)
         if experiment_xml is None and root_dir is not None:
             # Recover
             self.load_experiment_xml()
@@ -314,14 +324,17 @@ class ExperimentController(object):
     def _parallel(callables):
         excs = []
         def wrap(callable):
-            @functools.wraps(callable)
             def wrapped(*p, **kw):
                 try:
                     callable(*p, **kw)
                 except:
-                    import traceback
-                    traceback.print_exc(file=sys.stderr)
+                    logging.exception("Exception occurred in asynchronous thread:")
                     excs.append(sys.exc_info())
+            try:
+                wrapped = functools.wraps(callable)(wrapped)
+            except:
+                # functools.partial not wrappable
+                pass
             return wrapped
         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
         for thread in threads:
@@ -359,38 +372,50 @@ class ExperimentController(object):
                     self._testbeds[guid].do_setup()
                     self._testbeds[guid].recover()
                 except:
+                    self._logger.exception("During recovery of testbed %s", guid)
+                    
                     # Mark failed
                     self._failed_testbeds.add(guid)
     
         def steps_to_configure(self, allowed_guids):
             # perform setup in parallel for all test beds,
             # wait for all threads to finish
+
+            self._logger.debug("ExperimentController: Starting parallel do_setup")
             self._parallel([testbed.do_setup 
                             for guid,testbed in self._testbeds.iteritems()
                             if guid in allowed_guids])
        
             # perform create-connect in parallel, wait
             # (internal connections only)
+            self._logger.debug("ExperimentController: Starting parallel do_create")
             self._parallel([testbed.do_create
                             for guid,testbed in self._testbeds.iteritems()
                             if guid in allowed_guids])
 
+            self._logger.debug("ExperimentController: Starting parallel do_connect_init")
             self._parallel([testbed.do_connect_init
                             for guid,testbed in self._testbeds.iteritems()
                             if guid in allowed_guids])
 
+            self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
             self._parallel([testbed.do_connect_compl
                             for guid,testbed in self._testbeds.iteritems()
                             if guid in allowed_guids])
 
+            self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
             self._parallel([testbed.do_preconfigure
                             for guid,testbed in self._testbeds.iteritems()
                             if guid in allowed_guids])
             self._clear_caches()
+            
+            # Store testbed order
+            self._testbed_order.append(allowed_guids)
 
         steps_to_configure(self, to_restart)
 
         if self._netreffed_testbeds:
+            self._logger.debug("ExperimentController: Resolving netreffed testbeds")
             # initally resolve netrefs
             self.do_netrefs(data, fail_if_undefined=False)
             
@@ -410,6 +435,8 @@ class ExperimentController(object):
                         self._testbeds[guid].do_setup()
                         self._testbeds[guid].recover()
                     except:
+                        self._logger.exception("During recovery of testbed %s", guid)
+
                         # Mark failed
                         self._failed_testbeds.add(guid)
 
@@ -419,14 +446,17 @@ class ExperimentController(object):
         all_restart = [ self._testbeds[guid] for guid in all_restart ]
             
         # final netref step, fail if anything's left unresolved
-        self.do_netrefs(data, fail_if_undefined=True)
+        self._logger.debug("ExperimentController: Resolving do_netrefs")
+        self.do_netrefs(data, fail_if_undefined=False)
        
         # Only now, that netref dependencies have been solve, it is safe to
         # program cross_connections
+        self._logger.debug("ExperimentController: Programming testbed cross-connections")
         self._program_testbed_cross_connections(data)
  
         # perform do_configure in parallel for al testbeds
         # (it's internal configuration for each)
+        self._logger.debug("ExperimentController: Starting parallel do_configure")
         self._parallel([testbed.do_configure
                         for testbed in all_restart])
 
@@ -437,6 +467,7 @@ class ExperimentController(object):
         #time.sleep(60)
         
         # cross-connect (cannot be done in parallel)
+        self._logger.debug("ExperimentController: Starting cross-connect")
         for guid, testbed in self._testbeds.iteritems():
             cross_data = self._get_cross_data(guid)
             testbed.do_cross_connect_init(cross_data)
@@ -447,9 +478,13 @@ class ExperimentController(object):
         self._clear_caches()
 
         # Last chance to configure (parallel on all testbeds)
+        self._logger.debug("ExperimentController: Starting parallel do_prestart")
         self._parallel([testbed.do_prestart
                         for testbed in all_restart])
 
+        # final netref step, fail if anything's left unresolved
+        self.do_netrefs(data, fail_if_undefined=True)
         self._clear_caches()
         
         if not recover:
@@ -459,6 +494,7 @@ class ExperimentController(object):
             self.persist_execute_xml()
 
         # start experiment (parallel start on all testbeds)
+        self._logger.debug("ExperimentController: Starting parallel do_start")
         self._parallel([testbed.start
                         for testbed in all_restart])
 
@@ -479,8 +515,9 @@ class ExperimentController(object):
             conf.add_section(testbed_guid)
             for attr in testbed_config.get_attribute_list():
                 if attr not in TRANSIENT:
-                    conf.set(testbed_guid, attr, 
-                        testbed_config.get_attribute_value(attr))
+                    value = testbed_config.get_attribute_value(attr)
+                    if value is not None:
+                        conf.set(testbed_guid, attr, value)
         
         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
         conf.write(f)
@@ -511,18 +548,19 @@ class ExperimentController(object):
                     getter = getattr(conf, TYPEMAP.get(
                         testbed_config.get_attribute_type(attr),
                         'get') )
-                    testbed_config.set_attribute_value(
-                        attr, getter(testbed_guid, attr))
+                    try:
+                        value = getter(testbed_guid, attr)
+                        testbed_config.set_attribute_value(attr, value)
+                    except ConfigParser.NoOptionError:
+                        # Leave default
+                        pass
     
     def _unpersist_testbed_proxies(self):
         try:
             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
         except:
             # Just print exceptions, this is just cleanup
-            import traceback
-            ######## BUG ##########
-            #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
-            #traceback.print_exc(file=sys.stderr)
+            self._logger.exception("Loading testbed configuration")
 
     def _update_execute_xml(self):
         # For all testbeds,
@@ -663,11 +701,26 @@ class ExperimentController(object):
 
     def shutdown(self):
         exceptions = list()
-        for testbed in self._testbeds.values():
+        ordered_testbeds = set()
+
+        def shutdown_testbed(guid):
             try:
+                testbed = self._testbeds[guid]
+                ordered_testbeds.add(guid)
                 testbed.shutdown()
             except:
                 exceptions.append(sys.exc_info())
+                
+        self._logger.debug("ExperimentController: Starting parallel shutdown")
+        
+        for testbed_guids in reversed(self._testbed_order):
+            self._parallel([functools.partial(shutdown_testbed, guid)
+                            for guid in testbed_guids])
+        remaining_guids = set(self._testbeds) - ordered_testbeds
+        if remaining_guids:
+            self._parallel([functools.partial(shutdown_testbed, guid)
+                            for guid in remaining_guids])
+            
         for exc_info in exceptions:
             raise exc_info[0], exc_info[1], exc_info[2]
 
@@ -704,7 +757,7 @@ class ExperimentController(object):
                 testbed.get_route(guid, int(index), name),
         'trace' :
             lambda testbed, guid, index, name: 
-                testbed.trace(guid, index, name),
+                testbed.trace(guid, index, attribute = name),
         '' : 
             lambda testbed, guid, index, name: 
                 testbed.get(guid, name),
@@ -858,12 +911,13 @@ class ExperimentController(object):
                             if not label.startswith('GUID-'):
                                 ref_guid = label_guids.get(label)
                                 if ref_guid is not None:
-                                    value = ATTRIBUTE_PATTERN_BASE.sub(
+                                    value = value.replace(
+                                        match.group(),
                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
                                             guid = 'GUID-%d' % (ref_guid,),
                                             expr = match.group("expr"),
-                                            label = label)
-                                        value)
+                                            label = label)
+                                    )
                                     data.set_attribute_data(guid, name, value)
                                     
                                     # memorize which guid-attribute pairs require
@@ -993,6 +1047,8 @@ class ExperimentController(object):
                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
                                     cross_connector_type_name)
                             # save cross data for later
+                            self._logger.debug("ExperimentController: adding cross_connection data tbd=%d:guid=%d - tbd=%d:guid=%d" % \
+                                    (testbed_guid, guid, cross_testbed_guid, cross_guid))
                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
                                     cross_guid)
 
@@ -1041,4 +1097,65 @@ class ExperimentController(object):
                     elem_cross_data[attr_name] = _undefer(attr_value)
         
         return cross_data
-    
+    """
+class ExperimentSuite(object):
+    def __init__(self, experiment_xml, access_config, repetitions,
+            duration, wait_guids):
+        self._experiment_xml = experiment_xml
+        self._access_config = access_config
+        self._experiments = dict()
+        self._repetitions = repetitions
+        self._duration = duration
+        self._wait_guids = wait_guids
+        self._current = None
+        self._status = TS.STATUS_ZERO
+        self._thread = None
+
+    def start(self):
+        self._status  = TS.STATUS_STARTED
+        self._thread = threading.Thread(target = self._run_experiment_suite)
+        self._thread.start()
+
+    def shutdown(self):
+        if self._thread:
+            self._thread.join()
+            self._thread = None
+
+    def _run_experiment_suite(self):
+        for i in xrange[0, self.repetitions]:
+            self._current = i
+            self._run_one_experiment()
+
+    def _run_one_experiment(self):
+        access_config = proxy.AccessConfiguration()
+        for attr in self._access_config.attributes:
+            access_config.set_attribute_value(attr.name, attr.value)
+        access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+        root_dir = "%s_%d" % (
+                access_config.get_attribute_value(DC.ROOT_DIRECTORY), 
+                self._current)
+        access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
+        controller = proxy.create_experiment_controller(self._experiment_xml,
+                access_config)
+        self._experiments[self._current] = controller
+        controller.start()
+        started_at = time.time()
+        # wait until all specified guids have finished execution
+        if self._wait_guids:
+            while all(itertools.imap(controller.is_finished, self._wait_guids):
+                time.sleep(0.5)
+        # wait until the minimum experiment duration time has elapsed 
+        if self._duration:
+            while (time.time() - started_at) < self._duration:
+                time.sleep(0.5)
+        controller.stop()
+        #download results!!
+        controller.shutdown()
+    """
+
+
+
+
+
+
+