X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Futil%2Fproxy.py;h=c3b98b47a96b6a1616d3a949129858c75e6f3bd5;hb=b1761d355dd90f719da39eb4265cd136b11fe175;hp=e3d6edbc0d1120e400bb7d0ed4525fc43e55854d;hpb=9b57a0addc5aa3b57bad9f419a6d35e2d9dd1887;p=nepi.git diff --git a/src/nepi/util/proxy.py b/src/nepi/util/proxy.py index e3d6edbc..c3b98b47 100644 --- a/src/nepi/util/proxy.py +++ b/src/nepi/util/proxy.py @@ -5,7 +5,9 @@ import base64 from nepi.core.attributes import AttributesMap, Attribute from nepi.util import server, validation from nepi.util.constants import TIME_NOW +import getpass import sys +import time # PROTOCOL REPLIES OK = 0 @@ -48,7 +50,7 @@ FLOAT = 103 # EXPERIMENT CONTROLER PROTOCOL MESSAGES controller_messages = dict({ XML: "%d" % XML, - ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r"), + ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"), TRACE: "%d|%s" % (TRACE, "%d|%d|%s"), FINISHED: "%d|%s" % (FINISHED, "%d"), START: "%d" % START, @@ -153,6 +155,38 @@ def log_reply(server, reply): server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, code_txt, txt)) +def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent): + # launch daemon + proc = server.popen_ssh_subprocess(python_code, host = host, + port = port, user = user, agent = agent) + #while not proc.poll(): + # time.sleep(0.5) + if proc.poll(): + err = proc.stderr.read() + raise RuntimeError("Client could not be executed: %s" % \ + err) + # create client + return server.Client(root_dir, host = host, port = port, user = user, + agent = agent) + +def to_server_log_level(log_level): + return server.DEBUG_LEVEL \ + if log_level == AccessConfiguration.DEBUG_LEVEL \ + else server.ERROR_LEVEL + +def get_access_config_params(access_config): + root_dir = access_config.get_attribute_value("rootDirectory") + log_level = access_config.get_attribute_value("logLevel") + log_level = to_server_log_level(log_level) + user = host = port = agent = None + communication = access_config.get_attribute_value("communication") + if communication == AccessConfiguration.ACCESS_SSH: + user = access_config.get_attribute_value("user") + host = access_config.get_attribute_value("host") + port = access_config.get_attribute_value("port") + agent = access_config.get_attribute_value("useAgent") + return (root_dir, log_level, user, host, port, agent) + class AccessConfiguration(AttributesMap): MODE_SINGLE_PROCESS = "SINGLE" MODE_DAEMON = "DAEMON" @@ -185,6 +219,7 @@ class AccessConfiguration(AttributesMap): self.add_attribute(name = "user", help = "User on the Host to execute the testbed", type = Attribute.STRING, + value = getpass.getuser(), validation_function = validation.is_string) self.add_attribute(name = "port", help = "Port on the Host", @@ -210,28 +245,32 @@ class AccessConfiguration(AttributesMap): validation_function = validation.is_enum) def create_controller(xml, access_config = None): - mode = None if not access_config else access_config.get_attribute_value("mode") + mode = None if not access_config else \ + access_config.get_attribute_value("mode") if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS: from nepi.core.execute import ExperimentController return ExperimentController(xml) elif mode == AccessConfiguration.MODE_DAEMON: - root_dir = access_config.get_attribute_value("rootDirectory") - log_level = access_config.get_attribute_value("logLevel") - return ExperimentControllerProxy(root_dir, log_level, experiment_xml = xml) + (root_dir, log_level, user, host, port, agent) = \ + get_access_config_params(access_config) + return ExperimentControllerProxy(root_dir, log_level, + experiment_xml = xml, host = host, port = port, user = user, + agent = agent) raise RuntimeError("Unsupported access configuration 'mode'" % mode) def create_testbed_instance(testbed_id, testbed_version, access_config): mode = None if not access_config else access_config.get_attribute_value("mode") if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS: - return _build_testbed_testbed(testbed_id, testbed_version) + return _build_testbed_instance(testbed_id, testbed_version) elif mode == AccessConfiguration.MODE_DAEMON: - root_dir = access_config.get_attribute_value("rootDirectory") - log_level = access_config.get_attribute_value("logLevel") + (root_dir, log_level, user, host, port, agent) = \ + get_access_config_params(access_config) return TestbedIntanceProxy(root_dir, log_level, testbed_id = testbed_id, - testbed_version = testbed_version) + testbed_version = testbed_version, host = host, port = port, + user = user, agent = agent) raise RuntimeError("Unsupported access configuration 'mode'" % mode) -def _build_testbed_testbed(testbed_id, testbed_version): +def _build_testbed_instance(testbed_id, testbed_version): mod_name = "nepi.testbeds.%s" % (testbed_id.lower()) if not mod_name in sys.modules: __import__(mod_name) @@ -239,14 +278,14 @@ def _build_testbed_testbed(testbed_id, testbed_version): return module.TestbedInstance(testbed_version) class TestbedInstanceServer(server.Server): - def __init__(self, root_dir, testbed_id, testbed_version): - super(TestbedInstanceServer, self).__init__(root_dir) + def __init__(self, root_dir, log_level, testbed_id, testbed_version): + super(TestbedInstanceServer, self).__init__(root_dir, log_level) self._testbed_id = testbed_id self._testbed_version = testbed_version self._testbed = None def post_daemonize(self): - self._testbed = _build_testbed_testbed(self._testbed_id, + self._testbed = _build_testbed_instance(self._testbed_id, self._testbed_version) def reply_action(self, msg): @@ -468,8 +507,8 @@ class TestbedInstanceServer(server.Server): return "%d|%s" % (OK, result) class ExperimentControllerServer(server.Server): - def __init__(self, root_dir, experiment_xml): - super(ExperimentControllerServer, self).__init__(root_dir) + def __init__(self, root_dir, log_level, experiment_xml): + super(ExperimentControllerServer, self).__init__(root_dir, log_level) self._experiment_xml = experiment_xml self._controller = None @@ -522,6 +561,7 @@ class ExperimentControllerServer(server.Server): port = int(params[6]) root_dir = params[7] use_agent = params[8] == "True" + log_level = params[9] access_config = AccessConfiguration() access_config.set_attribute_value("mode", mode) access_config.set_attribute_value("communication", communication) @@ -530,6 +570,7 @@ class ExperimentControllerServer(server.Server): access_config.set_attribute_value("port", port) access_config.set_attribute_value("rootDirectory", root_dir) access_config.set_attribute_value("useAgent", use_agent) + access_config.set_attribute_value("logLevel", log_level) self._controller.set_access_configuration(testbed_guid, access_config) return "%d|%s" % (OK, "") @@ -562,18 +603,28 @@ class ExperimentControllerServer(server.Server): class TestbedIntanceProxy(object): def __init__(self, root_dir, log_level, testbed_id = None, - testbed_version = None, launch = True): + testbed_version = None, launch = True, host = None, + port = None, user = None, agent = None): if launch: if testbed_id == None or testbed_version == None: raise RuntimeError("To launch a TesbedInstance server a \ testbed_id and testbed_version are required") - # launch daemon - s = TestbedInstanceServer(root_dir, testbed_id, testbed_version) - if log_level == AccessConfiguration.DEBUG_LEVEL: - s.set_debug_log_level() - s.run() - # create_client - self._client = server.Client(root_dir) + # ssh + if host != None: + python_code = "from nepi.util.proxy import \ + TesbedInstanceServer;\ + s = TestbedInstanceServer('%s', %d, '%s', '%s');\ + s.run()" % (root_dir, log_level, testbed_id, + testbed_version) + self._client = launch_ssh_daemon_client(root_dir, python_code, + host, port, user, agent) + else: + # launch daemon + s = TestbedInstanceServer(root_dir, log_level, testbed_id, + testbed_version) + s.run() + # create client + self._client = server.Client(root_dir) @property def guids(self): @@ -850,18 +901,31 @@ class TestbedIntanceProxy(object): self._client.send_stop() class ExperimentControllerProxy(object): - def __init__(self, root_dir, log_level, experiment_xml = None, launch = True): + def __init__(self, root_dir, log_level, experiment_xml = None, + launch = True, host = None, port = None, user = None, + agent = None): if launch: + # launch server if experiment_xml == None: raise RuntimeError("To launch a ExperimentControllerServer a \ xml description of the experiment is required") - # launch daemon - s = ExperimentControllerServer(root_dir, experiment_xml) - if log_level == AccessConfiguration.DEBUG_LEVEL: - s.set_debug_log_level() - s.run() - # create_client - self._client = server.Client(root_dir) + # ssh + if host != None: + xml = experiment_xml + xml = xml.replace("'", r"\'") + xml = xml.replace("\"", r"\'") + xml = xml.replace("\n", r"") + python_code = "from nepi.util.proxy import ExperimentControllerServer;\ + s = ExperimentControllerServer('%s', %d, '%s');\ + s.run()" % (root_dir, log_level, xml) + self._client = launch_ssh_daemon_client(root_dir, python_code, + host, port, user, agent) + else: + # launch daemon + s = ExperimentControllerServer(root_dir, log_level, experiment_xml) + s.run() + # create client + self._client = server.Client(root_dir) @property def experiment_xml(self): @@ -883,9 +947,10 @@ class ExperimentControllerProxy(object): port = access_config.get_attribute_value("port") root_dir = access_config.get_attribute_value("rootDirectory") use_agent = access_config.get_attribute_value("useAgent") + log_level = access_config.get_attribute_value("logLevel") msg = controller_messages[ACCESS] msg = msg % (testbed_guid, mode, communication, host, user, port, - root_dir, use_agent) + root_dir, use_agent, log_level) self._client.send_msg(msg) reply = self._client.read_reply() result = reply.split("|")