From d90947ea17f0f4c6e255fb36dd95473176f4b84a Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Mon, 25 Jul 2011 18:54:00 +0200 Subject: [PATCH] Controller resucitation fixes. --- src/nepi/core/execute.py | 33 +++++++++++++++++++++++++++++---- src/nepi/util/proxy.py | 19 ++++++++++++++----- src/nepi/util/server.py | 19 ++++++++++--------- 3 files changed, 53 insertions(+), 18 deletions(-) diff --git a/src/nepi/core/execute.py b/src/nepi/core/execute.py index 703eae00..f59461e3 100644 --- a/src/nepi/core/execute.py +++ b/src/nepi/core/execute.py @@ -215,8 +215,13 @@ class ExperimentController(object): self._root_dir = root_dir self._netreffed_testbeds = set() self._guids_in_testbed_cache = dict() - - self.persist_experiment_xml() + + if experiment_xml is None and root_dir is not None: + # Recover + self.load_experiment_xml() + self.load_execute_xml() + else: + self.persist_experiment_xml() @property def experiment_design_xml(self): @@ -247,6 +252,18 @@ class ExperimentController(object): f.write(self._experiment_execute_xml) f.close() + def load_experiment_xml(self): + xml_path = os.path.join(self._root_dir, "experiment-design.xml") + f = open(xml_path, "r") + self._experiment_design_xml = f.read() + f.close() + + def load_execute_xml(self): + xml_path = os.path.join(self._root_dir, "experiment-execute.xml") + f = open(xml_path, "r") + self._experiment_execute_xml = f.read() + f.close() + def trace(self, guid, trace_id, attribute='value'): testbed = self._testbed_for_guid(guid) if testbed != None: @@ -412,6 +429,8 @@ class ExperimentController(object): Attribute.INTEGER : 'getint', } + TRANSIENT = ('Recover',) + # deferred import because proxy needs # our class definitions to define proxies import nepi.util.proxy as proxy @@ -502,11 +521,17 @@ class ExperimentController(object): # reload perviously persisted testbed access configurations self._load_testbed_proxies() + # Parse experiment xml + parser = XmlExperimentParser() + data = parser.from_xml_to_data(self._experiment_design_xml) + # recreate testbed proxies by reconnecting only - self._init_testbed_controllers(recover = True) + self._init_testbed_controllers(data, recover = True) # another time, for netrefs - self._init_testbed_controllers(recover = True) + self._init_testbed_controllers(data, recover = True) + + print >>sys.stderr, "RECOVERED" def is_finished(self, guid): testbed = self._testbed_for_guid(guid) diff --git a/src/nepi/util/proxy.py b/src/nepi/util/proxy.py index c0c7bccd..576aab1b 100644 --- a/src/nepi/util/proxy.py +++ b/src/nepi/util/proxy.py @@ -149,7 +149,7 @@ def get_access_config_params(access_config): environment_setup = ( access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP) if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP) - else None + else "" ) if communication == DC.ACCESS_SSH: user = access_config.get_attribute_value(DC.DEPLOYMENT_USER) @@ -210,10 +210,22 @@ def create_experiment_controller(xml, access_config = None): elif mode == DC.MODE_DAEMON: (root_dir, log_level, user, host, port, key, agent, environment_setup) = \ get_access_config_params(access_config) - return ExperimentControllerProxy(root_dir, log_level, + try: + return ExperimentControllerProxy(root_dir, log_level, experiment_xml = xml, host = host, port = port, user = user, ident_key = key, agent = agent, launch = launch, environment_setup = environment_setup) + except: + if not launch: + # Maybe controller died, recover from persisted testbed information if possible + controller = ExperimentControllerProxy(root_dir, log_level, + experiment_xml = xml, host = host, port = port, user = user, ident_key = key, + agent = agent, launch = True, + environment_setup = environment_setup) + controller.recover() + return controller + else: + raise raise RuntimeError("Unsupported access configuration '%s'" % mode) def create_testbed_controller(testbed_id, testbed_version, access_config): @@ -1035,9 +1047,6 @@ class ExperimentControllerProxy(BaseProxy): def __init__(self, root_dir, log_level, experiment_xml = None, launch = True, host = None, port = None, user = None, ident_key = None, agent = None, environment_setup = ""): - if launch and experiment_xml is None: - raise RuntimeError("To launch a ExperimentControllerServer a \ - xml description of the experiment is required") super(ExperimentControllerProxy,self).__init__( ctor_args = (root_dir, log_level, experiment_xml, environment_setup), root_dir = root_dir, diff --git a/src/nepi/util/server.py b/src/nepi/util/server.py index 5cc50da9..73fa84c0 100644 --- a/src/nepi/util/server.py +++ b/src/nepi/util/server.py @@ -75,7 +75,7 @@ def eintr_retry(func): return rv class Server(object): - def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = None): + def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""): self._root_dir = root_dir self._stop = False self._ctrl_sock = None @@ -178,15 +178,16 @@ class Server(object): out,err = envproc.communicate() # parse new environment - environment = dict(map(lambda x:x.split("\x02"), out.split("\x01"))) + if out: + environment = dict(map(lambda x:x.split("\x02"), out.split("\x01"))) - # apply to current environment - for name, value in environment.iteritems(): - os.environ[name] = value - - # apply pythonpath - if 'PYTHONPATH' in environment: - sys.path = environment['PYTHONPATH'].split(':') + sys.path + # apply to current environment + for name, value in environment.iteritems(): + os.environ[name] = value + + # apply pythonpath + if 'PYTHONPATH' in environment: + sys.path = environment['PYTHONPATH'].split(':') + sys.path # create control socket self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) -- 2.47.0