From 58e3deb9a5d9c92f20c10b4ed8441898d8152637 Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Fri, 6 May 2011 18:42:36 +0200 Subject: [PATCH] Typos, type, environment, synchronization and other small fixes to Proxies --- src/nepi/util/proxy.py | 44 +++++++++++++++++++++------------------- src/nepi/util/server.py | 45 +++++++++++++++++++++++++++++++---------- 2 files changed, 57 insertions(+), 32 deletions(-) diff --git a/src/nepi/util/proxy.py b/src/nepi/util/proxy.py index cc9c235e..c090bd13 100644 --- a/src/nepi/util/proxy.py +++ b/src/nepi/util/proxy.py @@ -103,7 +103,7 @@ testbed_messages = dict({ ACTION: "%d|%s" % (ACTION, "%s|%d|%s"), STATUS: "%d|%s" % (STATUS, "%s"), GUIDS: "%d" % GUIDS, - GET_ATTRIBUTE_LIST: "%d" % GET_ATTRIBUTE_LIST, + GET_ATTRIBUTE_LIST: "%d|%s" % (GET_ATTRIBUTE_LIST,"%d"), TESTBED_ID: "%d" % TESTBED_ID, TESTBED_VERSION: "%d" % TESTBED_VERSION, }) @@ -470,12 +470,10 @@ class TestbedControllerServer(server.Server): guid = int(params[1]) connector_type_name = params[2] cross_guid = int(params[3]) - connector_type_name = params[4] - cross_guid = int(params[5]) - cross_testbed_guid = int(params[6]) - cross_testbed_id = params[7] - cross_factory_id = params[8] - cross_connector_type_name = params[9] + cross_testbed_guid = int(params[4]) + cross_testbed_id = params[5] + cross_factory_id = params[6] + cross_connector_type_name = params[7] self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, cross_testbed_guid, cross_testbed_id, cross_factory_id, cross_connector_type_name) @@ -541,10 +539,10 @@ class TestbedControllerServer(server.Server): return "%d|%s" % (OK, "") def get(self, params): - guid = int(param[1]) + guid = int(params[1]) name = base64.b64decode(params[2]) - value = self._testbed.get(guid, name, time) time = params[3] + value = self._testbed.get(guid, name, time) result = base64.b64encode(str(value)) return "%d|%s" % (OK, result) @@ -713,15 +711,16 @@ class TestbedControllerProxy(object): testbed_id and testbed_version are required") # ssh if host != None: - python_code = "from nepi.util.proxy import \ - TesbedInstanceServer;\ - s = TestbedControllerServer('%s', %d, '%s', '%s');\ - s.run()" % (root_dir, log_level, testbed_id, + python_code = "from nepi.util.proxy import "\ + "TestbedControllerServer;"\ + "s = TestbedControllerServer('%s', %d, '%s', '%s');"\ + "s.run()" % (root_dir, log_level, testbed_id, testbed_version) proc = server.popen_ssh_subprocess(python_code, host = host, port = port, user = user, agent = agent, ident_key = ident_key, - environment_setup = environment_setup) + environment_setup = environment_setup, + waitcommand = True) if proc.poll(): err = proc.stderr.read() raise RuntimeError("Server could not be executed: %s" % \ @@ -734,7 +733,8 @@ class TestbedControllerProxy(object): # connect client to server self._client = server.Client(root_dir, host = host, port = port, - user = user, agent = agent) + user = user, agent = agent, + environment_setup = environment_setup) @property def guids(self): @@ -759,7 +759,7 @@ class TestbedControllerProxy(object): text = base64.b64decode(result[1]) if code == ERROR: raise RuntimeError(text) - return int(text) + return str(text) @property def testbed_version(self): @@ -771,7 +771,7 @@ class TestbedControllerProxy(object): text = base64.b64decode(result[1]) if code == ERROR: raise RuntimeError(text) - return int(text) + return str(text) def defer_configure(self, name, value): msg = testbed_messages[CONFIGURE] @@ -1065,7 +1065,7 @@ class TestbedControllerProxy(object): def status(self, guid = None): msg = testbed_messages[STATUS] - msg = msg % str(guid) + msg = msg % (guid,) self._client.send_msg(msg) reply = self._client.read_reply() result = reply.split("|") @@ -1090,7 +1090,7 @@ class TestbedControllerProxy(object): def get_attribute_list(self, guid): msg = testbed_messages[GET_ATTRIBUTE_LIST] - msg = msg % (guid) + msg = msg % (guid,) self._client.send_msg(msg) reply = self._client.read_reply() result = reply.split("|") @@ -1131,7 +1131,8 @@ class ExperimentControllerProxy(object): proc = server.popen_ssh_subprocess(python_code, host = host, port = port, user = user, agent = agent, ident_key = ident_key, - environment_setup = environment_setup) + environment_setup = environment_setup, + waitcommand = True) if proc.poll(): err = proc.stderr.read() raise RuntimeError("Server could not be executed: %s" % \ @@ -1143,7 +1144,8 @@ class ExperimentControllerProxy(object): # connect client to server self._client = server.Client(root_dir, host = host, port = port, - user = user, agent = agent) + user = user, agent = agent, + environment_setup = environment_setup) @property def experiment_xml(self): diff --git a/src/nepi/util/server.py b/src/nepi/util/server.py index 44c64084..1cfcea85 100644 --- a/src/nepi/util/server.py +++ b/src/nepi/util/server.py @@ -4,6 +4,7 @@ import base64 import errno import os +import os.path import resource import select import socket @@ -42,7 +43,12 @@ def shell_escape(s): return s else: # unsafe string - escape - s = s.replace("'","\\'") + def escp(c): + if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",): + return c + else: + return "'$'\\x%02x''" % (ord(c),) + s = ''.join(map(escp,s)) return "'%s'" % (s,) class Server(object): @@ -73,6 +79,11 @@ class Server(object): def daemonize(self): # pipes for process synchronization (r, w) = os.pipe() + + # build root folder + root = os.path.normpath(self._root_dir) + if not os.path.exists(root): + os.makedirs(root, 0755) pid1 = os.fork() if pid1 > 0: @@ -278,11 +289,12 @@ class Forwarder(object): class Client(object): def __init__(self, root_dir = ".", host = None, port = None, user = None, - agent = None): + agent = None, environment_setup = ""): self.root_dir = root_dir self.addr = (host, port) self.user = user self.agent = agent + self.environment_setup = environment_setup self._stopped = False self.connect() @@ -301,8 +313,13 @@ class Client(object): c.forward()" % (root_dir,) if host != None: self._process = popen_ssh_subprocess(python_code, host, port, - user, agent) + user, agent, + environment_setup = self.environment_setup) # popen_ssh_subprocess already waits for readiness + if self._process.poll(): + err = proc.stderr.read() + raise RuntimeError("Client could not be reached: %s" % \ + err) else: self._process = subprocess.Popen( ["python", "-c", python_code], @@ -617,15 +634,16 @@ def popen_ssh_subprocess(python_code, host, port, user, agent, ident_key = None, server_key = None, tty = False, - environment_setup = ""): + environment_setup = "", + waitcommand = False): + cmd = "" if python_path: python_path.replace("'", r"'\''") cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path - else: - cmd = "" + cmd += " ; " if environment_setup: cmd += environment_setup - cmd += " " + cmd += " ; " # Uncomment for debug (to run everything under strace) # We had to verify if strace works (cannot nest them) #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n" @@ -641,9 +659,13 @@ def popen_ssh_subprocess(python_code, host, port, user, agent, cmd += "cmd = base64.b64decode(cmd)\n" # Uncomment for debug #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n" - cmd += "os.write(1, \"OK\\n\")\n" # send a sync message - cmd += "exec(cmd)\n'" - + if not waitcommand: + cmd += "os.write(1, \"OK\\n\")\n" # send a sync message + cmd += "exec(cmd)\n" + if waitcommand: + cmd += "os.write(1, \"OK\\n\")\n" # send a sync message + cmd += "'" + tmp_known_hosts = None args = ['ssh', # Don't bother with localhost. Makes test easier @@ -675,6 +697,7 @@ def popen_ssh_subprocess(python_code, host, port, user, agent, base64.b64encode(python_code) + "\n") msg = os.read(proc.stdout.fileno(), 3) if msg != "OK\n": - raise RuntimeError("Failed to start remote python interpreter") + raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % ( + msg, proc.stdout.read(), proc.stderr.read()) return proc -- 2.47.0