Typos, type, environment, synchronization and other small fixes to Proxies
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 6 May 2011 16:42:36 +0000 (18:42 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 6 May 2011 16:42:36 +0000 (18:42 +0200)
src/nepi/util/proxy.py
src/nepi/util/server.py

index cc9c235..c090bd1 100644 (file)
@@ -103,7 +103,7 @@ testbed_messages = dict({
     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
     STATUS: "%d|%s" % (STATUS, "%s"),
     GUIDS:  "%d" % GUIDS,
-    GET_ATTRIBUTE_LIST:  "%d" % GET_ATTRIBUTE_LIST,
+    GET_ATTRIBUTE_LIST:  "%d|%s" % (GET_ATTRIBUTE_LIST,"%d"),
     TESTBED_ID:  "%d" % TESTBED_ID,
     TESTBED_VERSION:  "%d" % TESTBED_VERSION,
    })
@@ -470,12 +470,10 @@ class TestbedControllerServer(server.Server):
         guid = int(params[1])
         connector_type_name = params[2]
         cross_guid = int(params[3])
-        connector_type_name = params[4]
-        cross_guid = int(params[5])
-        cross_testbed_guid = int(params[6])
-        cross_testbed_id = params[7]
-        cross_factory_id = params[8]
-        cross_connector_type_name = params[9]
+        cross_testbed_guid = int(params[4])
+        cross_testbed_id = params[5]
+        cross_factory_id = params[6]
+        cross_connector_type_name = params[7]
         self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
             cross_connector_type_name)
@@ -541,10 +539,10 @@ class TestbedControllerServer(server.Server):
         return "%d|%s" % (OK, "")
 
     def get(self, params):
-        guid = int(param[1])
+        guid = int(params[1])
         name = base64.b64decode(params[2])
-        value = self._testbed.get(guid, name, time)
         time = params[3]
+        value = self._testbed.get(guid, name, time)
         result = base64.b64encode(str(value))
         return "%d|%s" % (OK, result)
 
@@ -713,15 +711,16 @@ class TestbedControllerProxy(object):
                         testbed_id and testbed_version are required")
             # ssh
             if host != None:
-                python_code = "from nepi.util.proxy import \
-                        TesbedInstanceServer;\
-                        s = TestbedControllerServer('%s', %d, '%s', '%s');\
-                        s.run()" % (root_dir, log_level, testbed_id, 
+                python_code = "from nepi.util.proxy import "\
+                        "TestbedControllerServer;"\
+                        "s = TestbedControllerServer('%s', %d, '%s', '%s');"\
+                        "s.run()" % (root_dir, log_level, testbed_id, 
                                 testbed_version)
                 proc = server.popen_ssh_subprocess(python_code, host = host,
                     port = port, user = user, agent = agent,
                     ident_key = ident_key,
-                    environment_setup = environment_setup)
+                    environment_setup = environment_setup,
+                    waitcommand = True)
                 if proc.poll():
                     err = proc.stderr.read()
                     raise RuntimeError("Server could not be executed: %s" % \
@@ -734,7 +733,8 @@ class TestbedControllerProxy(object):
 
         # connect client to server
         self._client = server.Client(root_dir, host = host, port = port, 
-                user = user, agent = agent)
+                user = user, agent = agent, 
+                environment_setup = environment_setup)
 
     @property
     def guids(self):
@@ -759,7 +759,7 @@ class TestbedControllerProxy(object):
         text = base64.b64decode(result[1])
         if code == ERROR:
             raise RuntimeError(text)
-        return int(text)
+        return str(text)
 
     @property
     def testbed_version(self):
@@ -771,7 +771,7 @@ class TestbedControllerProxy(object):
         text = base64.b64decode(result[1])
         if code == ERROR:
             raise RuntimeError(text)
-        return int(text)
+        return str(text)
 
     def defer_configure(self, name, value):
         msg = testbed_messages[CONFIGURE]
@@ -1065,7 +1065,7 @@ class TestbedControllerProxy(object):
 
     def status(self, guid = None):
         msg = testbed_messages[STATUS]
-        msg = msg % str(guid)
+        msg = msg % (guid,)
         self._client.send_msg(msg)
         reply = self._client.read_reply()
         result = reply.split("|")
@@ -1090,7 +1090,7 @@ class TestbedControllerProxy(object):
 
     def get_attribute_list(self, guid):
         msg = testbed_messages[GET_ATTRIBUTE_LIST]
-        msg = msg % (guid)
+        msg = msg % (guid,)
         self._client.send_msg(msg)
         reply = self._client.read_reply()
         result = reply.split("|")
@@ -1131,7 +1131,8 @@ class ExperimentControllerProxy(object):
                 proc = server.popen_ssh_subprocess(python_code, host = host,
                     port = port, user = user, agent = agent,
                     ident_key = ident_key,
-                    environment_setup = environment_setup)
+                    environment_setup = environment_setup,
+                    waitcommand = True)
                 if proc.poll():
                     err = proc.stderr.read()
                     raise RuntimeError("Server could not be executed: %s" % \
@@ -1143,7 +1144,8 @@ class ExperimentControllerProxy(object):
 
         # connect client to server
         self._client = server.Client(root_dir, host = host, port = port, 
-                user = user, agent = agent)
+                user = user, agent = agent,
+                environment_setup = environment_setup)
 
     @property
     def experiment_xml(self):
index 44c6408..1cfcea8 100644 (file)
@@ -4,6 +4,7 @@
 import base64
 import errno
 import os
+import os.path
 import resource
 import select
 import socket
@@ -42,7 +43,12 @@ def shell_escape(s):
         return s
     else:
         # unsafe string - escape
-        s = s.replace("'","\\'")
+        def escp(c):
+            if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
+                return c
+            else:
+                return "'$'\\x%02x''" % (ord(c),)
+        s = ''.join(map(escp,s))
         return "'%s'" % (s,)
 
 class Server(object):
@@ -73,6 +79,11 @@ class Server(object):
     def daemonize(self):
         # pipes for process synchronization
         (r, w) = os.pipe()
+        
+        # build root folder
+        root = os.path.normpath(self._root_dir)
+        if not os.path.exists(root):
+            os.makedirs(root, 0755)
 
         pid1 = os.fork()
         if pid1 > 0:
@@ -278,11 +289,12 @@ class Forwarder(object):
 
 class Client(object):
     def __init__(self, root_dir = ".", host = None, port = None, user = None, 
-            agent = None):
+            agent = None, environment_setup = ""):
         self.root_dir = root_dir
         self.addr = (host, port)
         self.user = user
         self.agent = agent
+        self.environment_setup = environment_setup
         self._stopped = False
         self.connect()
     
@@ -301,8 +313,13 @@ class Client(object):
                 c.forward()" % (root_dir,)
         if host != None:
             self._process = popen_ssh_subprocess(python_code, host, port, 
-                    user, agent)
+                    user, agent,
+                    environment_setup = self.environment_setup)
             # popen_ssh_subprocess already waits for readiness
+            if self._process.poll():
+                err = proc.stderr.read()
+                raise RuntimeError("Client could not be reached: %s" % \
+                        err)
         else:
             self._process = subprocess.Popen(
                     ["python", "-c", python_code],
@@ -617,15 +634,16 @@ def popen_ssh_subprocess(python_code, host, port, user, agent,
         ident_key = None,
         server_key = None,
         tty = False,
-        environment_setup = ""):
+        environment_setup = "",
+        waitcommand = False):
+        cmd = ""
         if python_path:
             python_path.replace("'", r"'\''")
             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
-        else:
-            cmd = ""
+            cmd += " ; "
         if environment_setup:
             cmd += environment_setup
-            cmd += " "
+            cmd += " "
         # Uncomment for debug (to run everything under strace)
         # We had to verify if strace works (cannot nest them)
         #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
@@ -641,9 +659,13 @@ def popen_ssh_subprocess(python_code, host, port, user, agent,
         cmd += "cmd = base64.b64decode(cmd)\n"
         # Uncomment for debug
         #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
-        cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
-        cmd += "exec(cmd)\n'"
-
+        if not waitcommand:
+            cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
+        cmd += "exec(cmd)\n"
+        if waitcommand:
+            cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
+        cmd += "'"
+        
         tmp_known_hosts = None
         args = ['ssh',
                 # Don't bother with localhost. Makes test easier
@@ -675,6 +697,7 @@ def popen_ssh_subprocess(python_code, host, port, user, agent,
                 base64.b64encode(python_code) + "\n")
         msg = os.read(proc.stdout.fileno(), 3)
         if msg != "OK\n":
-            raise RuntimeError("Failed to start remote python interpreter")
+            raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
+                msg, proc.stdout.read(), proc.stderr.read())
         return proc