X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Futil%2Fproxy.py;h=43c22c549af0f46e02bfe78549a2a094611cc76f;hb=e77befaa8e545d7ccc5bef6122331dc12c876bb9;hp=9cb26f347d231b1b3b8ce2cf0529b9b738857a86;hpb=bf3408af80db30702aa06b981d7928b40ebae092;p=nepi.git diff --git a/src/nepi/util/proxy.py b/src/nepi/util/proxy.py index 9cb26f34..43c22c54 100644 --- a/src/nepi/util/proxy.py +++ b/src/nepi/util/proxy.py @@ -3,6 +3,7 @@ import base64 import nepi.core.execute +import nepi.util.environ from nepi.core.attributes import AttributesMap, Attribute from nepi.util import server, validation from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC @@ -13,6 +14,7 @@ import time import tempfile import shutil import functools +import os # PROTOCOL REPLIES OK = 0 @@ -58,11 +60,16 @@ GET_FACTORY_ID = 38 GET_TESTBED_ID = 39 GET_TESTBED_VERSION = 40 TRACES_INFO = 41 +EXEC_XML = 42 +TESTBED_STATUS = 43 +STARTED_TIME = 44 +STOPPED_TIME = 45 instruction_text = dict({ OK: "OK", ERROR: "ERROR", XML: "XML", + EXEC_XML: "EXEC_XML", TRACE: "TRACE", FINISHED: "FINISHED", START: "START", @@ -100,6 +107,9 @@ instruction_text = dict({ TESTBED_ID: "TESTBED_ID", TESTBED_VERSION: "TESTBED_VERSION", TRACES_INFO: "TRACES_INFO", + STARTED_TIME: "STARTED_TIME", + STOPPED_TIME: "STOPPED_TIME", + }) def log_msg(server, params): @@ -131,29 +141,33 @@ def log_reply(server, reply): def to_server_log_level(log_level): return ( - server.DEBUG_LEVEL + DC.DEBUG_LEVEL if log_level == DC.DEBUG_LEVEL - else server.ERROR_LEVEL + else DC.ERROR_LEVEL ) def get_access_config_params(access_config): + mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE) + launch = not access_config.get_attribute_value(DC.RECOVER) root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY) log_level = access_config.get_attribute_value(DC.LOG_LEVEL) log_level = to_server_log_level(log_level) - user = host = port = agent = key = None communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION) environment_setup = ( access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP) if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP) - else None + else "" ) - if communication == DC.ACCESS_SSH: - user = access_config.get_attribute_value(DC.DEPLOYMENT_USER) - host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST) - port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT) - agent = access_config.get_attribute_value(DC.USE_AGENT) - key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY) - return (root_dir, log_level, user, host, port, key, agent, environment_setup) + user = access_config.get_attribute_value(DC.DEPLOYMENT_USER) + host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST) + port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT) + agent = access_config.get_attribute_value(DC.USE_AGENT) + sudo = access_config.get_attribute_value(DC.USE_SUDO) + key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY) + communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION) + clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT) + return (mode, launch, root_dir, log_level, communication, user, host, port, + key, agent, sudo, environment_setup, clean_root) class AccessConfiguration(AttributesMap): def __init__(self, params = None): @@ -161,9 +175,9 @@ class AccessConfiguration(AttributesMap): from nepi.core.metadata import Metadata - for _,attr_info in Metadata.DEPLOYMENT_ATTRIBUTES.iteritems(): + for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems(): self.add_attribute(**attr_info) - + if params: for attr_name, attr_value in params.iteritems(): parser = Attribute.type_parsers[self.get_attribute_type(attr_name)] @@ -182,14 +196,17 @@ class PermDir(object): self.path = path def create_experiment_controller(xml, access_config = None): - mode = None if not access_config \ - else access_config.get_attribute_value(DC.DEPLOYMENT_MODE) - launch = True if not access_config \ - else not access_config.get_attribute_value(DC.RECOVER) + mode = None + launch = True + log_level = DC.ERROR_LEVEL + if access_config: + (mode, launch, root_dir, log_level, communication, user, host, port, + key, agent, sudo, environment_setup, clean_root) \ + = get_access_config_params(access_config) + + os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level + if not mode or mode == DC.MODE_SINGLE_PROCESS: - if not launch: - raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,) - from nepi.core.execute import ExperimentController if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY): @@ -202,40 +219,92 @@ def create_experiment_controller(xml, access_config = None): # up at destruction time. controller._tempdir = root_dir + if not launch: + # try to recover + controller.recover() + return controller elif mode == DC.MODE_DAEMON: - (root_dir, log_level, user, host, port, key, agent, environment_setup) = \ - get_access_config_params(access_config) - return ExperimentControllerProxy(root_dir, log_level, - experiment_xml = xml, host = host, port = port, user = user, ident_key = key, - agent = agent, launch = launch, - environment_setup = environment_setup) + try: + return ExperimentControllerProxy(root_dir, log_level, + experiment_xml = xml, + communication = communication, + host = host, + port = port, + user = user, + ident_key = key, + agent = agent, + sudo = sudo, + launch = launch, + environment_setup = environment_setup, + clean_root = clean_root) + except: + if not launch: + # Maybe controller died, recover from persisted testbed information if possible + controller = ExperimentControllerProxy(root_dir, log_level, + experiment_xml = xml, + communication = communication, + host = host, + port = port, + user = user, + ident_key = key, + agent = agent, + sudo = sudo, + launch = True, + environment_setup = environment_setup, + clean_root = clean_root) + controller.recover() + return controller + else: + raise raise RuntimeError("Unsupported access configuration '%s'" % mode) def create_testbed_controller(testbed_id, testbed_version, access_config): - mode = None if not access_config \ - else access_config.get_attribute_value(DC.DEPLOYMENT_MODE) - launch = True if not access_config \ - else not access_config.get_attribute_value(DC.RECOVER) + mode = None + launch = True + log_level = DC.ERROR_LEVEL + if access_config: + (mode, launch, root_dir, log_level, communication, user, host, port, + key, agent, sudo, environment_setup, clean_root) \ + = get_access_config_params(access_config) + + os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level + if not mode or mode == DC.MODE_SINGLE_PROCESS: if not launch: - raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,) + raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,) return _build_testbed_controller(testbed_id, testbed_version) elif mode == DC.MODE_DAEMON: - (root_dir, log_level, user, host, port, key, agent, environment_setup) = \ - get_access_config_params(access_config) - return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, - testbed_version = testbed_version, host = host, port = port, ident_key = key, - user = user, agent = agent, launch = launch, - environment_setup = environment_setup) + return TestbedControllerProxy(root_dir, log_level, + testbed_id = testbed_id, + testbed_version = testbed_version, + communication = communication, + host = host, + port = port, + ident_key = key, + user = user, + agent = agent, + sudo = sudo, + launch = launch, + environment_setup = environment_setup, + clean_root = clean_root) raise RuntimeError("Unsupported access configuration '%s'" % mode) def _build_testbed_controller(testbed_id, testbed_version): - mod_name = "nepi.testbeds.%s" % (testbed_id.lower()) + mod_name = nepi.util.environ.find_testbed(testbed_id) + if not mod_name in sys.modules: - __import__(mod_name) + try: + __import__(mod_name) + except ImportError: + raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path) + module = sys.modules[mod_name] - return module.TestbedController(testbed_version) + tc = module.TestbedController() + if tc.testbed_version != testbed_version: + raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \ + (testbed_id, testbed_version, tc.testbed_version)) + return tc # Just a namespace class class Marshalling: @@ -251,7 +320,7 @@ class Marshalling: @staticmethod def nullint(sdata): return None if sdata == "None" else int(sdata) - + @staticmethod def bool(sdata): return sdata == 'True' @@ -437,8 +506,10 @@ class BaseServer(server.Server): return reply class TestbedControllerServer(BaseServer): - def __init__(self, root_dir, log_level, testbed_id, testbed_version): - super(TestbedControllerServer, self).__init__(root_dir, log_level) + def __init__(self, root_dir, log_level, testbed_id, testbed_version, + environment_setup, clean_root): + super(TestbedControllerServer, self).__init__(root_dir, log_level, + environment_setup = environment_setup, clean_root = clean_root) self._testbed_id = testbed_id self._testbed_version = testbed_version self._testbed = None @@ -545,17 +616,17 @@ class TestbedControllerServer(BaseServer): self._testbed.defer_add_trace(guid, trace_id) @Marshalling.handles(ADD_ADDRESS) - @Marshalling.args(int, str, int, str) + @Marshalling.args(int, str, int, Marshalling.pickled_data) @Marshalling.retvoid def defer_add_address(self, guid, address, netprefix, broadcast): self._testbed.defer_add_address(guid, address, netprefix, broadcast) @Marshalling.handles(ADD_ROUTE) - @Marshalling.args(int, str, int, str) + @Marshalling.args(int, str, int, str, int) @Marshalling.retvoid - def defer_add_route(self, guid, destination, netprefix, nexthop): - self._testbed.defer_add_route(guid, destination, netprefix, nexthop) + def defer_add_route(self, guid, destination, netprefix, nexthop, metric): + self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric) @Marshalling.handles(DO_SETUP) @Marshalling.args() @@ -647,11 +718,17 @@ class TestbedControllerServer(BaseServer): def status(self, guid): return self._testbed.status(guid) + @Marshalling.handles(TESTBED_STATUS) + @Marshalling.args() + @Marshalling.retval(int) + def testbed_status(self): + return self._testbed.testbed_status() + @Marshalling.handles(GET_ATTRIBUTE_LIST) - @Marshalling.args(int, int) + @Marshalling.args(int, Marshalling.nullint, Marshalling.bool) @Marshalling.retval( Marshalling.pickled_data ) - def get_attribute_list(self, guid, filter_flags = None): - return self._testbed.get_attribute_list(guid, filter_flags) + def get_attribute_list(self, guid, filter_flags = None, exclude = False): + return self._testbed.get_attribute_list(guid, filter_flags, exclude) @Marshalling.handles(GET_FACTORY_ID) @Marshalling.args(int) @@ -659,9 +736,18 @@ class TestbedControllerServer(BaseServer): def get_factory_id(self, guid): return self._testbed.get_factory_id(guid) + @Marshalling.handles(RECOVER) + @Marshalling.args() + @Marshalling.retvoid + def recover(self): + self._testbed.recover() + + class ExperimentControllerServer(BaseServer): - def __init__(self, root_dir, log_level, experiment_xml): - super(ExperimentControllerServer, self).__init__(root_dir, log_level) + def __init__(self, root_dir, log_level, experiment_xml, environment_setup, + clean_root): + super(ExperimentControllerServer, self).__init__(root_dir, log_level, + environment_setup = environment_setup, clean_root = clean_root) self._experiment_xml = experiment_xml self._experiment = None @@ -676,11 +762,29 @@ class ExperimentControllerServer(BaseServer): def guids(self): return self._experiment.guids + @Marshalling.handles(STARTED_TIME) + @Marshalling.args() + @Marshalling.retval( Marshalling.pickled_data ) + def started_time(self): + return self._experiment.started_time + + @Marshalling.handles(STOPPED_TIME) + @Marshalling.args() + @Marshalling.retval( Marshalling.pickled_data ) + def stopped_time(self): + return self._experiment.stopped_time + @Marshalling.handles(XML) @Marshalling.args() @Marshalling.retval() - def experiment_xml(self): - return self._experiment.experiment_xml + def experiment_design_xml(self): + return self._experiment.experiment_design_xml + + @Marshalling.handles(EXEC_XML) + @Marshalling.args() + @Marshalling.retval() + def experiment_execute_xml(self): + return self._experiment.experiment_execute_xml @Marshalling.handles(TRACE) @Marshalling.args(int, str, Marshalling.base64_data) @@ -700,6 +804,12 @@ class ExperimentControllerServer(BaseServer): def is_finished(self, guid): return self._experiment.is_finished(guid) + @Marshalling.handles(STATUS) + @Marshalling.args(int) + @Marshalling.retval(int) + def status(self, guid): + return self._experiment.is_finished(guid) + @Marshalling.handles(GET) @Marshalling.args(int, Marshalling.base64_data, str) @Marshalling.retval( Marshalling.pickled_data ) @@ -758,15 +868,19 @@ class BaseProxy(object): _ServerClass = None _ServerClassModule = "nepi.util.proxy" - def __init__(self, - ctor_args, root_dir, - launch = True, host = None, - port = None, user = None, ident_key = None, agent = None, - environment_setup = ""): + def __init__(self, ctor_args, root_dir, + launch = True, + communication = DC.ACCESS_LOCAL, + host = None, + port = None, + user = None, + ident_key = None, + agent = None, + sudo = False, + environment_setup = "", + clean_root = False): if launch: - # ssh - if host != None: - python_code = ( + python_code = ( "from %(classmodule)s import %(classname)s;" "s = %(classname)s%(ctor_args)r;" "s.run()" @@ -775,22 +889,34 @@ class BaseProxy(object): classmodule = self._ServerClassModule, ctor_args = ctor_args ) ) - proc = server.popen_ssh_subprocess(python_code, host = host, - port = port, user = user, agent = agent, - ident_key = ident_key, - environment_setup = environment_setup, - waitcommand = True) - if proc.poll(): - err = proc.stderr.read() - raise RuntimeError, "Server could not be executed: %s" % (err,) + proc = server.popen_python(python_code, + communication = communication, + host = host, + port = port, + user = user, + agent = agent, + ident_key = ident_key, + sudo = sudo, + environment_setup = environment_setup) + # Wait for the server to be ready, otherwise nobody + # will be able to connect to it + err = [] + helo = "nope" + while helo: + helo = proc.stderr.readline() + if helo == 'SERVER_READY.\n': + break + err.append(helo) else: - # launch daemon - s = self._ServerClass(*ctor_args) - s.run() - + raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),) # connect client to server - self._client = server.Client(root_dir, host = host, port = port, - user = user, agent = agent, + self._client = server.Client(root_dir, + communication = communication, + host = host, + port = port, + user = user, + agent = agent, + sudo = sudo, environment_setup = environment_setup) @staticmethod @@ -981,18 +1107,34 @@ class TestbedControllerProxy(BaseProxy): _ServerClass = TestbedControllerServer - def __init__(self, root_dir, log_level, testbed_id = None, - testbed_version = None, launch = True, host = None, - port = None, user = None, ident_key = None, agent = None, - environment_setup = ""): + def __init__(self, root_dir, log_level, + testbed_id = None, + testbed_version = None, + launch = True, + communication = DC.ACCESS_LOCAL, + host = None, + port = None, + user = None, + ident_key = None, + agent = None, + sudo = False, + environment_setup = "", + clean_root = False): if launch and (testbed_id == None or testbed_version == None): raise RuntimeError("To launch a TesbedControllerServer a " "testbed_id and testbed_version are required") super(TestbedControllerProxy,self).__init__( - ctor_args = (root_dir, log_level, testbed_id, testbed_version), + ctor_args = (root_dir, log_level, testbed_id, testbed_version, + environment_setup, clean_root), root_dir = root_dir, - launch = launch, host = host, port = port, user = user, - ident_key = ident_key, agent = agent, + launch = launch, + communication = communication, + host = host, + port = port, + user = user, + ident_key = ident_key, + agent = agent, + sudo = sudo, environment_setup = environment_setup) locals().update( BaseProxy._make_stubs( @@ -1011,18 +1153,32 @@ class TestbedControllerProxy(BaseProxy): class ExperimentControllerProxy(BaseProxy): _ServerClass = ExperimentControllerServer - def __init__(self, root_dir, log_level, experiment_xml = None, - launch = True, host = None, port = None, user = None, - ident_key = None, agent = None, environment_setup = ""): - if launch and experiment_xml is None: - raise RuntimeError("To launch a ExperimentControllerServer a \ - xml description of the experiment is required") + def __init__(self, root_dir, log_level, + experiment_xml = None, + launch = True, + communication = DC.ACCESS_LOCAL, + host = None, + port = None, + user = None, + ident_key = None, + agent = None, + sudo = False, + environment_setup = "", + clean_root = False): super(ExperimentControllerProxy,self).__init__( - ctor_args = (root_dir, log_level, experiment_xml), + ctor_args = (root_dir, log_level, experiment_xml, environment_setup, + clean_root), root_dir = root_dir, - launch = launch, host = host, port = port, user = user, - ident_key = ident_key, agent = agent, - environment_setup = environment_setup) + launch = launch, + communication = communication, + host = host, + port = port, + user = user, + ident_key = ident_key, + agent = agent, + sudo = sudo, + environment_setup = environment_setup, + clean_root = clean_root) locals().update( BaseProxy._make_stubs( server_class = ExperimentControllerServer,