server daemon launched over ssh connection.
[nepi.git] / src / nepi / util / proxy.py
index e3d6edb..c3b98b4 100644 (file)
@@ -5,7 +5,9 @@ import base64
 from nepi.core.attributes import AttributesMap, Attribute
 from nepi.util import server, validation
 from nepi.util.constants import TIME_NOW
+import getpass
 import sys
+import time
 
 # PROTOCOL REPLIES
 OK = 0
@@ -48,7 +50,7 @@ FLOAT   = 103
 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
 controller_messages = dict({
     XML:    "%d" % XML,
-    ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r"),
+    ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s"),
     FINISHED:   "%d|%s" % (FINISHED, "%d"),
     START:  "%d" % START,
@@ -153,6 +155,38 @@ def log_reply(server, reply):
     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
             code_txt, txt))
 
+def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
+    # launch daemon
+    proc = server.popen_ssh_subprocess(python_code, host = host,
+        port = port, user = user, agent = agent)
+    #while not proc.poll():
+    #    time.sleep(0.5)
+    if proc.poll():
+        err = proc.stderr.read()
+        raise RuntimeError("Client could not be executed: %s" % \
+                err)
+    # create client
+    return server.Client(root_dir, host = host, port = port, user = user, 
+            agent = agent)
+
+def to_server_log_level(log_level):
+    return server.DEBUG_LEVEL \
+            if log_level == AccessConfiguration.DEBUG_LEVEL \
+                else server.ERROR_LEVEL
+
+def get_access_config_params(access_config):
+    root_dir = access_config.get_attribute_value("rootDirectory")
+    log_level = access_config.get_attribute_value("logLevel")
+    log_level = to_server_log_level(log_level)
+    user = host = port = agent = None
+    communication = access_config.get_attribute_value("communication")
+    if communication == AccessConfiguration.ACCESS_SSH:
+        user = access_config.get_attribute_value("user")
+        host = access_config.get_attribute_value("host")
+        port = access_config.get_attribute_value("port")
+        agent = access_config.get_attribute_value("useAgent")
+    return (root_dir, log_level, user, host, port, agent)
+
 class AccessConfiguration(AttributesMap):
     MODE_SINGLE_PROCESS = "SINGLE"
     MODE_DAEMON = "DAEMON"
@@ -185,6 +219,7 @@ class AccessConfiguration(AttributesMap):
         self.add_attribute(name = "user",
                 help = "User on the Host to execute the testbed",
                 type = Attribute.STRING,
+                value = getpass.getuser(),
                 validation_function = validation.is_string)
         self.add_attribute(name = "port",
                 help = "Port on the Host",
@@ -210,28 +245,32 @@ class AccessConfiguration(AttributesMap):
                 validation_function = validation.is_enum)
 
 def create_controller(xml, access_config = None):
-    mode = None if not access_config else access_config.get_attribute_value("mode")
+    mode = None if not access_config else \
+            access_config.get_attribute_value("mode")
     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
         from nepi.core.execute import ExperimentController
         return ExperimentController(xml)
     elif mode == AccessConfiguration.MODE_DAEMON:
-        root_dir = access_config.get_attribute_value("rootDirectory")
-        log_level = access_config.get_attribute_value("logLevel")
-        return ExperimentControllerProxy(root_dir, log_level, experiment_xml = xml)
+        (root_dir, log_level, user, host, port, agent) = \
+                get_access_config_params(access_config)
+        return ExperimentControllerProxy(root_dir, log_level,
+                experiment_xml = xml, host = host, port = port, user = user, 
+                agent = agent)
     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
 
 def create_testbed_instance(testbed_id, testbed_version, access_config):
     mode = None if not access_config else access_config.get_attribute_value("mode")
     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
-        return  _build_testbed_testbed(testbed_id, testbed_version)
+        return  _build_testbed_instance(testbed_id, testbed_version)
     elif mode == AccessConfiguration.MODE_DAEMON:
-        root_dir = access_config.get_attribute_value("rootDirectory")
-        log_level = access_config.get_attribute_value("logLevel")
+        (root_dir, log_level, user, host, port, agent) = \
+                get_access_config_params(access_config)
         return TestbedIntanceProxy(root_dir, log_level, testbed_id = testbed_id, 
-                testbed_version = testbed_version)
+                testbed_version = testbed_version, host = host, port = port,
+                user = user, agent = agent)
     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
 
-def _build_testbed_testbed(testbed_id, testbed_version):
+def _build_testbed_instance(testbed_id, testbed_version):
     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
     if not mod_name in sys.modules:
         __import__(mod_name)
@@ -239,14 +278,14 @@ def _build_testbed_testbed(testbed_id, testbed_version):
     return module.TestbedInstance(testbed_version)
 
 class TestbedInstanceServer(server.Server):
-    def __init__(self, root_dir, testbed_id, testbed_version):
-        super(TestbedInstanceServer, self).__init__(root_dir)
+    def __init__(self, root_dir, log_level, testbed_id, testbed_version):
+        super(TestbedInstanceServer, self).__init__(root_dir, log_level)
         self._testbed_id = testbed_id
         self._testbed_version = testbed_version
         self._testbed = None
 
     def post_daemonize(self):
-        self._testbed = _build_testbed_testbed(self._testbed_id, 
+        self._testbed = _build_testbed_instance(self._testbed_id, 
                 self._testbed_version)
 
     def reply_action(self, msg):
@@ -468,8 +507,8 @@ class TestbedInstanceServer(server.Server):
         return "%d|%s" % (OK, result)
  
 class ExperimentControllerServer(server.Server):
-    def __init__(self, root_dir, experiment_xml):
-        super(ExperimentControllerServer, self).__init__(root_dir)
+    def __init__(self, root_dir, log_level, experiment_xml):
+        super(ExperimentControllerServer, self).__init__(root_dir, log_level)
         self._experiment_xml = experiment_xml
         self._controller = None
 
@@ -522,6 +561,7 @@ class ExperimentControllerServer(server.Server):
         port = int(params[6])
         root_dir = params[7]
         use_agent = params[8] == "True"
+        log_level = params[9]
         access_config = AccessConfiguration()
         access_config.set_attribute_value("mode", mode)
         access_config.set_attribute_value("communication", communication)
@@ -530,6 +570,7 @@ class ExperimentControllerServer(server.Server):
         access_config.set_attribute_value("port", port)
         access_config.set_attribute_value("rootDirectory", root_dir)
         access_config.set_attribute_value("useAgent", use_agent)
+        access_config.set_attribute_value("logLevel", log_level)
         self._controller.set_access_configuration(testbed_guid, 
                 access_config)
         return "%d|%s" % (OK, "")
@@ -562,18 +603,28 @@ class ExperimentControllerServer(server.Server):
 
 class TestbedIntanceProxy(object):
     def __init__(self, root_dir, log_level, testbed_id = None, 
-            testbed_version = None, launch = True):
+            testbed_version = None, launch = True, host = None, 
+            port = None, user = None, agent = None):
         if launch:
             if testbed_id == None or testbed_version == None:
                 raise RuntimeError("To launch a TesbedInstance server a \
                         testbed_id and testbed_version are required")
-            # launch daemon
-            s = TestbedInstanceServer(root_dir, testbed_id, testbed_version)
-            if log_level == AccessConfiguration.DEBUG_LEVEL:
-                s.set_debug_log_level()
-            s.run()
-        # create_client
-        self._client = server.Client(root_dir)
+            # ssh
+            if host != None:
+                python_code = "from nepi.util.proxy import \
+                        TesbedInstanceServer;\
+                        s = TestbedInstanceServer('%s', %d, '%s', '%s');\
+                        s.run()" % (root_dir, log_level, testbed_id, 
+                                testbed_version)
+                self._client = launch_ssh_daemon_client(root_dir, python_code,
+                        host, port, user, agent)
+            else:
+                # launch daemon
+                s = TestbedInstanceServer(root_dir, log_level, testbed_id, 
+                    testbed_version)
+                s.run()
+                # create client
+                self._client = server.Client(root_dir)
 
     @property
     def guids(self):
@@ -850,18 +901,31 @@ class TestbedIntanceProxy(object):
         self._client.send_stop()
 
 class ExperimentControllerProxy(object):
-    def __init__(self, root_dir, log_level, experiment_xml = None, launch = True):
+    def __init__(self, root_dir, log_level, experiment_xml = None, 
+            launch = True, host = None, port = None, user = None, 
+            agent = None):
         if launch:
+            # launch server
             if experiment_xml == None:
                 raise RuntimeError("To launch a ExperimentControllerServer a \
                         xml description of the experiment is required")
-            # launch daemon
-            s = ExperimentControllerServer(root_dir, experiment_xml)
-            if log_level == AccessConfiguration.DEBUG_LEVEL:
-                s.set_debug_log_level()
-            s.run()
-        # create_client
-        self._client = server.Client(root_dir)
+            # ssh
+            if host != None:
+                xml = experiment_xml
+                xml = xml.replace("'", r"\'")
+                xml = xml.replace("\"", r"\'")
+                xml = xml.replace("\n", r"")
+                python_code = "from nepi.util.proxy import ExperimentControllerServer;\
+                        s = ExperimentControllerServer('%s', %d, '%s');\
+                        s.run()" % (root_dir, log_level, xml)
+                self._client = launch_ssh_daemon_client(root_dir, python_code,
+                        host, port, user, agent)
+            else:
+                # launch daemon
+                s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
+                s.run()
+                # create client
+                self._client = server.Client(root_dir)
 
     @property
     def experiment_xml(self):
@@ -883,9 +947,10 @@ class ExperimentControllerProxy(object):
         port = access_config.get_attribute_value("port")
         root_dir = access_config.get_attribute_value("rootDirectory")
         use_agent = access_config.get_attribute_value("useAgent")
+        log_level = access_config.get_attribute_value("logLevel")
         msg = controller_messages[ACCESS]
         msg = msg % (testbed_guid, mode, communication, host, user, port, 
-                root_dir, use_agent)
+                root_dir, use_agent, log_level)
         self._client.send_msg(msg)
         reply = self._client.read_reply()
         result = reply.split("|")