Controller resucitation fixes.
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Mon, 25 Jul 2011 16:54:00 +0000 (18:54 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Mon, 25 Jul 2011 16:54:00 +0000 (18:54 +0200)
src/nepi/core/execute.py
src/nepi/util/proxy.py
src/nepi/util/server.py

index 703eae0..f59461e 100644 (file)
@@ -215,8 +215,13 @@ class ExperimentController(object):
         self._root_dir = root_dir
         self._netreffed_testbeds = set()
         self._guids_in_testbed_cache = dict()
-
-        self.persist_experiment_xml()
+        
+        if experiment_xml is None and root_dir is not None:
+            # Recover
+            self.load_experiment_xml()
+            self.load_execute_xml()
+        else:
+            self.persist_experiment_xml()
 
     @property
     def experiment_design_xml(self):
@@ -247,6 +252,18 @@ class ExperimentController(object):
         f.write(self._experiment_execute_xml)
         f.close()
 
+    def load_experiment_xml(self):
+        xml_path = os.path.join(self._root_dir, "experiment-design.xml")
+        f = open(xml_path, "r")
+        self._experiment_design_xml = f.read()
+        f.close()
+
+    def load_execute_xml(self):
+        xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
+        f = open(xml_path, "r")
+        self._experiment_execute_xml = f.read()
+        f.close()
+
     def trace(self, guid, trace_id, attribute='value'):
         testbed = self._testbed_for_guid(guid)
         if testbed != None:
@@ -412,6 +429,8 @@ class ExperimentController(object):
             Attribute.INTEGER : 'getint',
         }
         
+        TRANSIENT = ('Recover',)
+        
         # deferred import because proxy needs
         # our class definitions to define proxies
         import nepi.util.proxy as proxy
@@ -502,11 +521,17 @@ class ExperimentController(object):
         # reload perviously persisted testbed access configurations
         self._load_testbed_proxies()
         
+        # Parse experiment xml
+        parser = XmlExperimentParser()
+        data = parser.from_xml_to_data(self._experiment_design_xml)
+        
         # recreate testbed proxies by reconnecting only
-        self._init_testbed_controllers(recover = True)
+        self._init_testbed_controllers(data, recover = True)
         
         # another time, for netrefs
-        self._init_testbed_controllers(recover = True)
+        self._init_testbed_controllers(data, recover = True)
+        
+        print >>sys.stderr, "RECOVERED"
 
     def is_finished(self, guid):
         testbed = self._testbed_for_guid(guid)
index c0c7bcc..576aab1 100644 (file)
@@ -149,7 +149,7 @@ def get_access_config_params(access_config):
     environment_setup = (
         access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
         if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
-        else None
+        else ""
     )
     if communication == DC.ACCESS_SSH:
         user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
@@ -210,10 +210,22 @@ def create_experiment_controller(xml, access_config = None):
     elif mode == DC.MODE_DAEMON:
         (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
                 get_access_config_params(access_config)
-        return ExperimentControllerProxy(root_dir, log_level,
+        try:
+            return ExperimentControllerProxy(root_dir, log_level,
                 experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
                 agent = agent, launch = launch,
                 environment_setup = environment_setup)
+        except:
+            if not launch:
+                # Maybe controller died, recover from persisted testbed information if possible
+                controller = ExperimentControllerProxy(root_dir, log_level,
+                    experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
+                    agent = agent, launch = True,
+                    environment_setup = environment_setup)
+                controller.recover()
+                return controller
+            else:
+                raise
     raise RuntimeError("Unsupported access configuration '%s'" % mode)
 
 def create_testbed_controller(testbed_id, testbed_version, access_config):
@@ -1035,9 +1047,6 @@ class ExperimentControllerProxy(BaseProxy):
     def __init__(self, root_dir, log_level, experiment_xml = None, 
             launch = True, host = None, port = None, user = None, 
             ident_key = None, agent = None, environment_setup = ""):
-        if launch and experiment_xml is None:
-            raise RuntimeError("To launch a ExperimentControllerServer a \
-                    xml description of the experiment is required")
         super(ExperimentControllerProxy,self).__init__(
             ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
             root_dir = root_dir,
index 5cc50da..73fa84c 100644 (file)
@@ -75,7 +75,7 @@ def eintr_retry(func):
     return rv
 
 class Server(object):
-    def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = None):
+    def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
         self._root_dir = root_dir
         self._stop = False
         self._ctrl_sock = None
@@ -178,15 +178,16 @@ class Server(object):
             out,err = envproc.communicate()
 
             # parse new environment
-            environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
+            if out:
+                environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
             
-            # apply to current environment
-            for name, value in environment.iteritems():
-                os.environ[name] = value
-            
-            # apply pythonpath
-            if 'PYTHONPATH' in environment:
-                sys.path = environment['PYTHONPATH'].split(':') + sys.path
+                # apply to current environment
+                for name, value in environment.iteritems():
+                    os.environ[name] = value
+                
+                # apply pythonpath
+                if 'PYTHONPATH' in environment:
+                    sys.path = environment['PYTHONPATH'].split(':') + sys.path
 
         # create control socket
         self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)