Ticket #25: controller recovery mode
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 19 Apr 2011 09:38:31 +0000 (11:38 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 19 Apr 2011 09:38:31 +0000 (11:38 +0200)
src/nepi/core/execute.py
src/nepi/util/proxy.py
test/core/integration.py
test/testbeds/ns3/integration.py

index 3dfcdb4..60c26cc 100644 (file)
@@ -8,6 +8,8 @@ from nepi.util.parser._xml import XmlExperimentParser
 import sys
 import re
 import threading
+import ConfigParser
+import os
 
 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
@@ -288,11 +290,12 @@ class TestbedInstance(object):
         raise NotImplementedError
 
 class ExperimentController(object):
-    def __init__(self, experiment_xml):
+    def __init__(self, experiment_xml, root_dir):
         self._experiment_xml = experiment_xml
         self._testbeds = dict()
         self._access_config = dict()
         self._netrefs = dict()
+        self._root_dir = root_dir
 
     @property
     def experiment_xml(self):
@@ -315,6 +318,9 @@ class ExperimentController(object):
     def start(self):
         self._create_testbed_instances()
         
+        # persist testbed connection data, for potential recovery
+        self._persist_testbed_proxies()
+        
         # perform setup in parallel for all test beds,
         # wait for all threads to finish
         self._parallel([testbed.do_setup 
@@ -342,9 +348,69 @@ class ExperimentController(object):
         self._parallel([testbed.start
                         for testbed in self._testbeds.itervalues()])
 
+    def _persist_testbed_proxies(self):
+        TRANSIENT = ('Recover',)
+        
+        # persist access configuration for all testbeds, so that
+        # recovery mode can reconnect to them if it becomes necessary
+        conf = ConfigParser.RawConfigParser()
+        for testbed_guid, testbed_config in self._access_config.iteritems():
+            testbed_guid = str(testbed_guid)
+            conf.add_section(testbed_guid)
+            for attr in testbed_config.attributes_name:
+                if attr not in TRANSIENT:
+                    conf.set(testbed_guid, attr, 
+                        testbed_config.get_attribute_value(attr))
+        
+        f = open(os.path.join(self._root_dir, 'access_config.ini'), 'w')
+        conf.write(f)
+        f.close()
+    
+    def _load_testbed_proxies(self):
+        TYPEMAP = {
+            STRING : 'get',
+            INTEGER : 'getint',
+            FLOAT : 'getfloat',
+            BOOLEAN : 'getboolean',
+        }
+        
+        conf = ConfigParser.RawConfigParser()
+        conf.read(os.path.join(self._root_dir, 'access_config.ini'))
+        for testbed_guid in conf.sections():
+            testbed_config = proxy.AccessConfiguration()
+            for attr in conf.options(testbed_guid):
+                testbed_config.set_attribute_value(attr, 
+                    conf.get(testbed_guid, attr) )
+                
+            testbed_guid = str(testbed_guid)
+            conf.add_section(testbed_guid)
+            for attr in testbed_config.attributes_name:
+                if attr not in TRANSIENT:
+                    getter = getattr(conf, TYPEMAP.get(
+                        testbed_config.get_attribute_type(attr),
+                        'get') )
+                    testbed_config.set_attribute_value(
+                        testbed_guid, attr, getter(attr))
+    
+    def _unpersist_testbed_proxies(self):
+        try:
+            os.remove(os.path.join(self._root_dir, 'access_config.ini'))
+        except:
+            # Just print exceptions, this is just cleanup
+            import traceback
+            traceback.print_exc(file=sys.stderr)
+
     def stop(self):
        for testbed in self._testbeds.values():
            testbed.stop()
+       self._unpersist_testbed_proxies()
+   
+    def recover(self):
+        # reload perviously persisted testbed access configurations
+        self._load_testbed_proxies()
+        
+        # recreate testbed proxies by reconnecting only
+        self._create_testbed_instances(recover=True)
 
     def is_finished(self, guid):
         for testbed in self._testbeds.values():
@@ -414,7 +480,7 @@ class ExperimentController(object):
                                     if fail_if_undefined:
                                         raise ValueError, "Unresolvable GUID: %r, in netref: %r" % (ref_guid, expr)
 
-    def _create_testbed_instances(self):
+    def _create_testbed_instances(self, recover = False):
         parser = XmlExperimentParser()
         data = parser.from_xml_to_data(self._experiment_xml)
         element_guids = list()
@@ -426,6 +492,14 @@ class ExperimentController(object):
                 (testbed_id, testbed_version) = data.get_testbed_data(guid)
                 access_config = None if guid not in self._access_config else\
                         self._access_config[guid]
+                
+                if recover and access_config is None:
+                    # need to create one
+                    access_config = self._access_config[guid] = proxy.AccessConfiguration()
+                if access_config is not None:
+                    # force recovery mode 
+                    access_config.set_attribute_value("recover",recover)
+                
                 testbed = proxy.create_testbed_instance(testbed_id, 
                         testbed_version, access_config)
                 for (name, value) in data.get_attribute_data(guid):
@@ -462,7 +536,8 @@ class ExperimentController(object):
                                     # (which could require high-latency network I/O)
                                     (testbed_guid, factory_id) = data.get_box_data(guid)
                                     netrefs.setdefault((testbed_guid,guid),set()).add(name)
-        self._program_testbed_instances(element_guids, data)
+        if not recover:
+            self._program_testbed_instances(element_guids, data)
 
     def _program_testbed_instances(self, element_guids, data):
         for guid in element_guids:
index 35079ec..b7486b9 100644 (file)
@@ -8,6 +8,8 @@ from nepi.util.constants import TIME_NOW
 import getpass
 import sys
 import time
+import tempfile
+import shutil
 
 # PROTOCOL REPLIES
 OK = 0
@@ -42,6 +44,7 @@ STATUS  = 26
 GUIDS  = 27
 GET_ROUTE = 28
 GET_ADDRESS = 29
+RECOVER = 30
 
 # PARAMETER TYPE
 STRING  =  100
@@ -57,6 +60,7 @@ controller_messages = dict({
     FINISHED:   "%d|%s" % (FINISHED, "%d"),
     START:  "%d" % START,
     STOP:   "%d" % STOP,
+    RECOVER : "%d" % RECOVER,
     SHUTDOWN:   "%d" % SHUTDOWN,
     })
 
@@ -98,6 +102,7 @@ instruction_text = dict({
     FINISHED:   "FINISHED",
     START:  "START",
     STOP:   "STOP",
+    RECOVER: "RECOVER",
     SHUTDOWN:   "SHUTDOWN",
     CONFIGURE:  "CONFIGURE",
     CREATE: "CREATE",
@@ -248,32 +253,69 @@ class AccessConfiguration(AttributesMap):
                 allowed = [AccessConfiguration.ERROR_LEVEL,
                     AccessConfiguration.DEBUG_LEVEL],
                 validation_function = validation.is_enum)
+        self.add_attribute(name = "recover",
+                help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.", 
+                type = Attribute.BOOL,
+                value = False,
+                validation_function = validation.is_bool)
+
+class TempDir(object):
+    def __init__(self):
+        self.path = tempfile.mkdtemp()
+    
+    def __del__(self):
+        shutil.rmtree(self.path)
+
+class PermDir(object):
+    def __init__(self, path):
+        self.path = path
 
 def create_controller(xml, access_config = None):
-    mode = None if not access_config else \
-            access_config.get_attribute_value("mode")
+    mode = None if not access_config \
+            else access_config.get_attribute_value("mode")
+    launch = True if not access_config \
+            else not access_config.get_attribute_value("recover")
     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
+        if not launch:
+            raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
+        
         from nepi.core.execute import ExperimentController
-        return ExperimentController(xml)
+        
+        if not access_config or not access_config.has_attribute("rootDirectory"):
+            root_dir = TempDir()
+        else:
+            root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
+        controller = ExperimentController(xml, root_dir.path)
+        
+        # inject reference to temporary dir, so that it gets cleaned
+        # up at destruction time.
+        controller._tempdir = root_dir
+        
+        return controller
     elif mode == AccessConfiguration.MODE_DAEMON:
         (root_dir, log_level, user, host, port, agent) = \
                 get_access_config_params(access_config)
         return ExperimentControllerProxy(root_dir, log_level,
                 experiment_xml = xml, host = host, port = port, user = user, 
-                agent = agent)
-    raise RuntimeError("Unsupported access configuration 'mode'" % mode)
+                agent = agent, launch = launch)
+    raise RuntimeError("Unsupported access configuration '%s'" % mode)
 
 def create_testbed_instance(testbed_id, testbed_version, access_config):
-    mode = None if not access_config else access_config.get_attribute_value("mode")
+    mode = None if not access_config \
+            else access_config.get_attribute_value("mode")
+    launch = True if not access_config \
+            else not access_config.get_attribute_value("recover")
     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
+        if not launch:
+            raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
         return  _build_testbed_instance(testbed_id, testbed_version)
     elif mode == AccessConfiguration.MODE_DAEMON:
         (root_dir, log_level, user, host, port, agent) = \
                 get_access_config_params(access_config)
         return TestbedInstanceProxy(root_dir, log_level, testbed_id = testbed_id, 
                 testbed_version = testbed_version, host = host, port = port,
-                user = user, agent = agent)
-    raise RuntimeError("Unsupported access configuration 'mode'" % mode)
+                user = user, agent = agent, launch = launch)
+    raise RuntimeError("Unsupported access configuration '%s'" % mode)
 
 def _build_testbed_instance(testbed_id, testbed_version):
     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
@@ -537,36 +579,43 @@ class ExperimentControllerServer(server.Server):
 
     def post_daemonize(self):
         from nepi.core.execute import ExperimentController
-        self._controller = ExperimentController(self._experiment_xml)
+        self._controller = ExperimentController(self._experiment_xml, 
+            root_dir = self._root_dir)
 
     def reply_action(self, msg):
-        params = msg.split("|")
-        instruction = int(params[0])
-        log_msg(self, params)
-        try:
-            if instruction == XML:
-                reply = self.experiment_xml(params)
-            elif instruction == ACCESS:
-                reply = self.set_access_configuration(params)
-            elif instruction == TRACE:
-                reply = self.trace(params)
-            elif instruction == FINISHED:
-                reply = self.is_finished(params)
-            elif instruction == START:
-                reply = self.start(params)
-            elif instruction == STOP:
-                reply = self.stop(params)
-            elif instruction == SHUTDOWN:
-                reply = self.shutdown(params)
-            else:
-                error = "Invalid instruction %s" % instruction
-                self.log_error(error)
+        if not msg:
+            result = base64.b64encode("Invalid command line")
+            reply = "%d|%s" % (ERROR, result)
+        else:
+            params = msg.split("|")
+            instruction = int(params[0])
+            log_msg(self, params)
+            try:
+                if instruction == XML:
+                    reply = self.experiment_xml(params)
+                elif instruction == ACCESS:
+                    reply = self.set_access_configuration(params)
+                elif instruction == TRACE:
+                    reply = self.trace(params)
+                elif instruction == FINISHED:
+                    reply = self.is_finished(params)
+                elif instruction == START:
+                    reply = self.start(params)
+                elif instruction == STOP:
+                    reply = self.stop(params)
+                elif instruction == RECOVER:
+                    reply = self.recover(params)
+                elif instruction == SHUTDOWN:
+                    reply = self.shutdown(params)
+                else:
+                    error = "Invalid instruction %s" % instruction
+                    self.log_error(error)
+                    result = base64.b64encode(error)
+                    reply = "%d|%s" % (ERROR, result)
+            except:
+                error = self.log_error()
                 result = base64.b64encode(error)
                 reply = "%d|%s" % (ERROR, result)
-        except:
-            error = self.log_error()
-            result = base64.b64encode(error)
-            reply = "%d|%s" % (ERROR, result)
         log_reply(self, reply)
         return reply
 
@@ -621,6 +670,10 @@ class ExperimentControllerServer(server.Server):
         self._controller.stop()
         return "%d|%s" % (OK, "")
 
+    def recover(self, params):
+        self._controller.recover()
+        return "%d|%s" % (OK, "")
+
     def shutdown(self, params):
         self._controller.shutdown()
         return "%d|%s" % (OK, "")
@@ -957,6 +1010,7 @@ class TestbedInstanceProxy(object):
         if code == ERROR:
             raise RuntimeError(text)
         self._client.send_stop()
+        self._client.read_reply() # wait for it
 
 class ExperimentControllerProxy(object):
     def __init__(self, root_dir, log_level, experiment_xml = None, 
@@ -1057,6 +1111,16 @@ class ExperimentControllerProxy(object):
         if code == ERROR:
             raise RuntimeError(text)
 
+    def recover(self):
+        msg = controller_messages[RECOVER]
+        self._client.send_msg(msg)
+        reply = self._client.read_reply()
+        result = reply.split("|")
+        code = int(result[0])
+        text =  base64.b64decode(result[1])
+        if code == ERROR:
+            raise RuntimeError(text)
+
     def is_finished(self, guid):
         msg = controller_messages[FINISHED]
         msg = msg % guid
@@ -1079,4 +1143,5 @@ class ExperimentControllerProxy(object):
         if code == ERROR:
             raise RuntimeError(text)
         self._client.send_stop()
+        self._client.read_reply() # wait for it
 
index acda8e7..d8d92eb 100755 (executable)
@@ -19,6 +19,9 @@ class ExecuteTestCase(unittest.TestCase):
         sys.modules["nepi.testbeds.mock.metadata_v01"] = mock.metadata_v01
         sys.modules["nepi.testbeds.mock"] = mock
         self.root_dir = tempfile.mkdtemp()
+
+    def tearDown(self):
+        shutil.rmtree(self.root_dir)
     
     def make_test_experiment(self):
         exp_desc = ExperimentDescription()
@@ -135,6 +138,49 @@ class ExecuteTestCase(unittest.TestCase):
         controller.stop()
         controller.shutdown()
 
+    def test_daemonized_all_integration_recovery(self):
+        exp_desc, desc, app, node1, node2, iface1, iface2 = self.make_test_experiment()
+        xml = exp_desc.to_xml()
+        access_config = proxy.AccessConfiguration()
+        access_config.set_attribute_value("mode", 
+                proxy.AccessConfiguration.MODE_DAEMON)
+        access_config.set_attribute_value("rootDirectory", self.root_dir)
+        controller = proxy.create_controller(xml, access_config)
+
+        access_config2 = proxy.AccessConfiguration()
+        access_config2.set_attribute_value("mode", 
+                proxy.AccessConfiguration.MODE_DAEMON)
+        inst_root_dir = os.path.join(self.root_dir, "instance")
+        os.mkdir(inst_root_dir)
+        access_config2.set_attribute_value("rootDirectory", inst_root_dir)
+        controller.set_access_configuration(desc.guid, access_config2)
+
+        controller.start()
+        while not controller.is_finished(app.guid):
+            time.sleep(0.5)
+        fake_result = controller.trace(desc.guid, app.guid, "fake")
+        comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+
+--- 10.0.0.2 ping statistics ---
+1 packets transmitted, 1 received, 0% packet loss, time 0ms
+"""
+        self.assertTrue(fake_result.startswith(comp_result))
+        
+        # controller dies
+        del controller
+        
+        # recover
+        access_config.set_attribute_value("recover",True)
+        controller = proxy.create_controller(xml, access_config)
+        
+        # test recovery
+        self.assertTrue(controller.is_finished(app.guid))
+        fake_result = controller.trace(desc.guid, app.guid, "fake")
+        self.assertTrue(fake_result.startswith(comp_result))
+        
+        controller.stop()
+        controller.shutdown()
+
     def test_reference_expressions(self):
         exp_desc, desc, app, node1, node2, iface1, iface2 = self.make_test_experiment()
         
@@ -202,9 +248,6 @@ class ExecuteTestCase(unittest.TestCase):
         controller.stop()
         controller.shutdown()
  
-    def tearDown(self):
-        shutil.rmtree(self.root_dir)
-
 if __name__ == '__main__':
     unittest.main()
 
index 88bd79c..2d66f8a 100755 (executable)
@@ -67,7 +67,7 @@ class Ns3IntegrationTestCase(unittest.TestCase):
         app.connector("node").connect(node1.connector("apps"))
 
         xml = exp_desc.to_xml()
-        controller = ExperimentController(xml)
+        controller = ExperimentController(xml, self.root_dir)
         controller.start()
         while not controller.is_finished(app.guid):
             time.sleep(0.5)