Initially working version of PlanetLab testbed implementation.
[nepi.git] / src / nepi / core / execute.py
index dea3866..add0909 100644 (file)
@@ -7,6 +7,9 @@ from nepi.util.constants import STATUS_FINISHED, TIME_NOW
 from nepi.util.parser._xml import XmlExperimentParser
 import sys
 import re
+import threading
+import ConfigParser
+import os
 
 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
@@ -94,7 +97,8 @@ class ConnectorType(object):
 # need a definition!
 class Factory(AttributesMap):
     def __init__(self, factory_id, create_function, start_function, 
-            stop_function, status_function, configure_function,
+            stop_function, status_function, 
+            configure_function, preconfigure_function,
             allow_addresses = False, allow_routes = False):
         super(Factory, self).__init__()
         self._factory_id = factory_id
@@ -105,6 +109,7 @@ class Factory(AttributesMap):
         self._stop_function = stop_function
         self._status_function = status_function
         self._configure_function = configure_function
+        self._preconfigure_function = preconfigure_function
         self._connector_types = dict()
         self._traces = list()
         self._box_attributes = AttributesMap()
@@ -145,6 +150,10 @@ class Factory(AttributesMap):
     def configure_function(self):
         return self._configure_function
 
+    @property
+    def preconfigure_function(self):
+        return self._preconfigure_function
+
     @property
     def traces(self):
         return self._traces
@@ -163,7 +172,7 @@ class Factory(AttributesMap):
         self._box_attributes.add_attribute(name, help, type, value, range, 
                 allowed, flags, validation_function)
 
-class TestbedInstance(object):
+class TestbedController(object):
     def __init__(self, testbed_id, testbed_version):
         self._testbed_id = testbed_id
         self._testbed_version = testbed_version
@@ -287,40 +296,136 @@ class TestbedInstance(object):
         raise NotImplementedError
 
 class ExperimentController(object):
-    def __init__(self, experiment_xml):
+    def __init__(self, experiment_xml, root_dir):
         self._experiment_xml = experiment_xml
         self._testbeds = dict()
         self._access_config = dict()
         self._netrefs = dict()
+        self._root_dir = root_dir
+
+        self.persist_experiment_xml()
 
     @property
     def experiment_xml(self):
         return self._experiment_xml
 
+    def persist_experiment_xml(self):
+        xml_path = os.path.join(self._root_dir, "experiment.xml")
+        f = open(xml_path, "w")
+        f.write(self._experiment_xml)
+        f.close()
+
     def set_access_configuration(self, testbed_guid, access_config):
         self._access_config[testbed_guid] = access_config
 
-    def trace(self, testbed_guid, guid, trace_id):
-        return self._testbeds[testbed_guid].trace(guid, trace_id)
+    def trace(self, testbed_guid, guid, trace_id, attribute='value'):
+        return self._testbeds[testbed_guid].trace(guid, trace_id, attribute)
+
+    @staticmethod
+    def _parallel(callables):
+        threads = [ threading.Thread(target=callable) for callable in callables ]
+        for thread in threads:
+            thread.start()
+        for thread in threads:
+            thread.join()
 
     def start(self):
         self._create_testbed_instances()
-        for testbed in self._testbeds.values():
-            testbed.do_setup()
-        for testbed in self._testbeds.values():
-            testbed.do_create()
-            testbed.do_connect()
+        
+        # persist testbed connection data, for potential recovery
+        self._persist_testbed_proxies()
+        
+        # perform setup in parallel for all test beds,
+        # wait for all threads to finish
+        self._parallel([testbed.do_setup 
+                        for testbed in self._testbeds.itervalues()])
+        
+        # perform create-connect in parallel, wait
+        # (internal connections only)
+        self._parallel([lambda : (testbed.do_create(), 
+                                  testbed.do_connect(),
+                                  testbed.do_preconfigure())
+                        for testbed in self._testbeds.itervalues()])
+        
+        # resolve netrefs
         self.do_netrefs(fail_if_undefined=True)
-        for testbed in self._testbeds.values():
-            testbed.do_configure()
+        
+        # perform do_configure in parallel for al testbeds
+        # (it's internal configuration for each)
+        self._parallel([testbed.do_configure
+                        for testbed in self._testbeds.itervalues()])
+
+        # cross-connect (cannot be done in parallel)
         for testbed in self._testbeds.values():
             testbed.do_cross_connect()
-        for testbed in self._testbeds.values():
-            testbed.start()
+        
+        # start experiment (parallel start on all testbeds)
+        self._parallel([testbed.start
+                        for testbed in self._testbeds.itervalues()])
+
+    def _persist_testbed_proxies(self):
+        TRANSIENT = ('Recover',)
+        
+        # persist access configuration for all testbeds, so that
+        # recovery mode can reconnect to them if it becomes necessary
+        conf = ConfigParser.RawConfigParser()
+        for testbed_guid, testbed_config in self._access_config.iteritems():
+            testbed_guid = str(testbed_guid)
+            conf.add_section(testbed_guid)
+            for attr in testbed_config.attributes_name:
+                if attr not in TRANSIENT:
+                    conf.set(testbed_guid, attr, 
+                        testbed_config.get_attribute_value(attr))
+        
+        f = open(os.path.join(self._root_dir, 'access_config.ini'), 'w')
+        conf.write(f)
+        f.close()
+    
+    def _load_testbed_proxies(self):
+        TYPEMAP = {
+            STRING : 'get',
+            INTEGER : 'getint',
+            FLOAT : 'getfloat',
+            BOOLEAN : 'getboolean',
+        }
+        
+        conf = ConfigParser.RawConfigParser()
+        conf.read(os.path.join(self._root_dir, 'access_config.ini'))
+        for testbed_guid in conf.sections():
+            testbed_config = proxy.AccessConfiguration()
+            for attr in conf.options(testbed_guid):
+                testbed_config.set_attribute_value(attr, 
+                    conf.get(testbed_guid, attr) )
+                
+            testbed_guid = str(testbed_guid)
+            conf.add_section(testbed_guid)
+            for attr in testbed_config.attributes_name:
+                if attr not in TRANSIENT:
+                    getter = getattr(conf, TYPEMAP.get(
+                        testbed_config.get_attribute_type(attr),
+                        'get') )
+                    testbed_config.set_attribute_value(
+                        testbed_guid, attr, getter(attr))
+    
+    def _unpersist_testbed_proxies(self):
+        try:
+            os.remove(os.path.join(self._root_dir, 'access_config.ini'))
+        except:
+            # Just print exceptions, this is just cleanup
+            import traceback
+            traceback.print_exc(file=sys.stderr)
 
     def stop(self):
        for testbed in self._testbeds.values():
            testbed.stop()
+       self._unpersist_testbed_proxies()
+   
+    def recover(self):
+        # reload perviously persisted testbed access configurations
+        self._load_testbed_proxies()
+        
+        # recreate testbed proxies by reconnecting only
+        self._create_testbed_instances(recover=True)
 
     def is_finished(self, guid):
         for testbed in self._testbeds.values():
@@ -390,7 +495,7 @@ class ExperimentController(object):
                                     if fail_if_undefined:
                                         raise ValueError, "Unresolvable GUID: %r, in netref: %r" % (ref_guid, expr)
 
-    def _create_testbed_instances(self):
+    def _create_testbed_instances(self, recover = False):
         parser = XmlExperimentParser()
         data = parser.from_xml_to_data(self._experiment_xml)
         element_guids = list()
@@ -402,6 +507,14 @@ class ExperimentController(object):
                 (testbed_id, testbed_version) = data.get_testbed_data(guid)
                 access_config = None if guid not in self._access_config else\
                         self._access_config[guid]
+                
+                if recover and access_config is None:
+                    # need to create one
+                    access_config = self._access_config[guid] = proxy.AccessConfiguration()
+                if access_config is not None:
+                    # force recovery mode 
+                    access_config.set_attribute_value("recover",recover)
+                
                 testbed = proxy.create_testbed_instance(testbed_id, 
                         testbed_version, access_config)
                 for (name, value) in data.get_attribute_data(guid):
@@ -438,7 +551,8 @@ class ExperimentController(object):
                                     # (which could require high-latency network I/O)
                                     (testbed_guid, factory_id) = data.get_box_data(guid)
                                     netrefs.setdefault((testbed_guid,guid),set()).add(name)
-        self._program_testbed_instances(element_guids, data)
+        if not recover:
+            self._program_testbed_instances(element_guids, data)
 
     def _program_testbed_instances(self, element_guids, data):
         for guid in element_guids: