From: Alina Quereilhac <alina.quereilhac@inria.fr> Date: Thu, 25 Aug 2011 11:12:54 +0000 (+0200) Subject: Daemonized servers are now always launched with popen, and not directly invoked in... X-Git-Tag: nepi-3.0.0~275 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=96b8e278dfc18f0c7a4aa37158a46f9f9b85ad4b;p=nepi.git Daemonized servers are now always launched with popen, and not directly invoked in the same process. --- diff --git a/src/nepi/core/execute.py b/src/nepi/core/execute.py index 552b4a9f..0a526915 100644 --- a/src/nepi/core/execute.py +++ b/src/nepi/core/execute.py @@ -14,6 +14,7 @@ import collections import functools import time import logging +logging.basicConfig() ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}") ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}" diff --git a/src/nepi/core/metadata.py b/src/nepi/core/metadata.py index 509d1dec..b96f009b 100644 --- a/src/nepi/core/metadata.py +++ b/src/nepi/core/metadata.py @@ -341,6 +341,17 @@ class Metadata(object): "validation_function" : validation.is_bool, "category" : AC.CATEGORY_DEPLOYMENT, }), + DC.USE_SUDO : dict({ + "name" : DC.USE_SUDO, + "help" : "Use sudo to run the deamon process. This option only take flace when the server runs in daemon mode.", + "type" : Attribute.BOOL, + "value" : False, + "flags" : Attribute.ExecReadOnly |\ + Attribute.ExecImmutable |\ + Attribute.Metadata, + "validation_function" : validation.is_bool, + "category" : AC.CATEGORY_DEPLOYMENT, + }), DC.LOG_LEVEL : dict({ "name" : DC.LOG_LEVEL, "help" : "Log level for instance", diff --git a/src/nepi/util/constants.py b/src/nepi/util/constants.py index be566e79..237dc476 100644 --- a/src/nepi/util/constants.py +++ b/src/nepi/util/constants.py @@ -73,6 +73,7 @@ class DeploymentConfiguration: ROOT_DIRECTORY = "rootDirectory" USE_AGENT = "useAgent" + USE_SUDO = "useSudo" LOG_LEVEL = "logLevel" RECOVER = "recover" RECOVERY_POLICY = "recoveryPolicy" diff --git a/src/nepi/util/proxy.py b/src/nepi/util/proxy.py index 6b6bf3f6..47a94955 100644 --- a/src/nepi/util/proxy.py +++ b/src/nepi/util/proxy.py @@ -150,20 +150,22 @@ def get_access_config_params(access_config): root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY) log_level = access_config.get_attribute_value(DC.LOG_LEVEL) log_level = to_server_log_level(log_level) - user = host = port = agent = key = None + user = host = port = agent = key = sudo = None communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION) environment_setup = ( access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP) if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP) else "" ) - if communication == DC.ACCESS_SSH: - user = access_config.get_attribute_value(DC.DEPLOYMENT_USER) - host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST) - port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT) - agent = access_config.get_attribute_value(DC.USE_AGENT) - key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY) - return (root_dir, log_level, user, host, port, key, agent, environment_setup) + user = access_config.get_attribute_value(DC.DEPLOYMENT_USER) + host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST) + port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT) + agent = access_config.get_attribute_value(DC.USE_AGENT) + sudo = access_config.get_attribute_value(DC.USE_SUDO) + key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY) + communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION) + return (root_dir, log_level, communication, user, host, port, key, agent, + sudo, environment_setup) class AccessConfiguration(AttributesMap): def __init__(self, params = None): @@ -215,19 +217,33 @@ def create_experiment_controller(xml, access_config = None): return controller elif mode == DC.MODE_DAEMON: - (root_dir, log_level, user, host, port, key, agent, environment_setup) = \ - get_access_config_params(access_config) + (root_dir, log_level, communication, user, host, port, key, agent, + sudo, environment_setup) = get_access_config_params(access_config) try: return ExperimentControllerProxy(root_dir, log_level, - experiment_xml = xml, host = host, port = port, user = user, ident_key = key, - agent = agent, launch = launch, + experiment_xml = xml, + communication = communication, + host = host, + port = port, + user = user, + ident_key = key, + agent = agent, + sudo = sudo, + launch = launch, environment_setup = environment_setup) except: if not launch: # Maybe controller died, recover from persisted testbed information if possible controller = ExperimentControllerProxy(root_dir, log_level, - experiment_xml = xml, host = host, port = port, user = user, ident_key = key, - agent = agent, launch = True, + experiment_xml = xml, + communication = communication, + host = host, + port = port, + user = user, + ident_key = key, + agent = agent, + sudo = sudo, + launch = True, environment_setup = environment_setup) controller.recover() return controller @@ -245,11 +261,19 @@ def create_testbed_controller(testbed_id, testbed_version, access_config): raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,) return _build_testbed_controller(testbed_id, testbed_version) elif mode == DC.MODE_DAEMON: - (root_dir, log_level, user, host, port, key, agent, environment_setup) = \ - get_access_config_params(access_config) - return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id, - testbed_version = testbed_version, host = host, port = port, ident_key = key, - user = user, agent = agent, launch = launch, + (root_dir, log_level, communication, user, host, port, key, agent, + sudo, environment_setup) = get_access_config_params(access_config) + return TestbedControllerProxy(root_dir, log_level, + testbed_id = testbed_id, + testbed_version = testbed_version, + communication = communication, + host = host, + port = port, + ident_key = key, + user = user, + agent = agent, + sudo = sudo, + launch = launch, environment_setup = environment_setup) raise RuntimeError("Unsupported access configuration '%s'" % mode) @@ -829,15 +853,18 @@ class BaseProxy(object): _ServerClass = None _ServerClassModule = "nepi.util.proxy" - def __init__(self, - ctor_args, root_dir, - launch = True, host = None, - port = None, user = None, ident_key = None, agent = None, + def __init__(self, ctor_args, root_dir, + launch = True, + communication = DC.ACCESS_LOCAL, + host = None, + port = None, + user = None, + ident_key = None, + agent = None, + sudo = False, environment_setup = ""): if launch: - # ssh - if host != None: - python_code = ( + python_code = ( "from %(classmodule)s import %(classname)s;" "s = %(classname)s%(ctor_args)r;" "s.run()" @@ -846,22 +873,29 @@ class BaseProxy(object): classmodule = self._ServerClassModule, ctor_args = ctor_args ) ) - proc = server.popen_ssh_subprocess(python_code, host = host, - port = port, user = user, agent = agent, - ident_key = ident_key, - environment_setup = environment_setup, - waitcommand = True) - if proc.poll(): - err = proc.stderr.read() - raise RuntimeError, "Server could not be executed: %s" % (err,) - else: - # launch daemon - s = self._ServerClass(*ctor_args) - s.run() - + proc = server.popen_python(python_code, + communication = communication, + host = host, + port = port, + user = user, + agent = agent, + ident_key = ident_key, + sudo = sudo, + environment_setup = environment_setup) + # Wait for the server to be ready, otherwise nobody + # will be able to connect to it + helo = proc.stderr.readline() + if helo != 'SERVER_READY.\n': + raise AssertionError, "Expected 'SERVER_READY.', got %r: %s" % (helo, + helo + proc.stderr.read()) # connect client to server - self._client = server.Client(root_dir, host = host, port = port, - user = user, agent = agent, + self._client = server.Client(root_dir, + communication = communication, + host = host, + port = port, + user = user, + agent = agent, + sudo = sudo, environment_setup = environment_setup) @staticmethod @@ -1052,9 +1086,17 @@ class TestbedControllerProxy(BaseProxy): _ServerClass = TestbedControllerServer - def __init__(self, root_dir, log_level, testbed_id = None, - testbed_version = None, launch = True, host = None, - port = None, user = None, ident_key = None, agent = None, + def __init__(self, root_dir, log_level, + testbed_id = None, + testbed_version = None, + launch = True, + communication = DC.ACCESS_LOCAL, + host = None, + port = None, + user = None, + ident_key = None, + agent = None, + sudo = False, environment_setup = ""): if launch and (testbed_id == None or testbed_version == None): raise RuntimeError("To launch a TesbedControllerServer a " @@ -1062,8 +1104,14 @@ class TestbedControllerProxy(BaseProxy): super(TestbedControllerProxy,self).__init__( ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup), root_dir = root_dir, - launch = launch, host = host, port = port, user = user, - ident_key = ident_key, agent = agent, + launch = launch, + communication = communication, + host = host, + port = port, + user = user, + ident_key = ident_key, + agent = agent, + sudo = sudo, environment_setup = environment_setup) locals().update( BaseProxy._make_stubs( @@ -1082,14 +1130,28 @@ class TestbedControllerProxy(BaseProxy): class ExperimentControllerProxy(BaseProxy): _ServerClass = ExperimentControllerServer - def __init__(self, root_dir, log_level, experiment_xml = None, - launch = True, host = None, port = None, user = None, - ident_key = None, agent = None, environment_setup = ""): + def __init__(self, root_dir, log_level, + experiment_xml = None, + launch = True, + communication = DC.ACCESS_LOCAL, + host = None, + port = None, + user = None, + ident_key = None, + agent = None, + sudo = False, + environment_setup = ""): super(ExperimentControllerProxy,self).__init__( ctor_args = (root_dir, log_level, experiment_xml, environment_setup), root_dir = root_dir, - launch = launch, host = host, port = port, user = user, - ident_key = ident_key, agent = agent, + launch = launch, + communication = communication, + host = host, + port = port, + user = user, + ident_key = ident_key, + agent = agent, + sudo = sudo, environment_setup = environment_setup) locals().update( BaseProxy._make_stubs( diff --git a/src/nepi/util/server.py b/src/nepi/util/server.py index 9808b05e..b95f3573 100644 --- a/src/nepi/util/server.py +++ b/src/nepi/util/server.py @@ -1,6 +1,8 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +from nepi.util.constants import DeploymentConfiguration as DC + import base64 import errno import os @@ -98,9 +100,11 @@ class Server(object): # first process (the one that did the first fork) returned. os._exit(0) except: + print >>sys.stderr, "SERVER_ERROR." self.log_error() self.cleanup() os._exit(0) + print >>sys.stderr, "SERVER_READY." def daemonize(self): # pipes for process synchronization @@ -307,7 +311,7 @@ class Forwarder(object): def forward(self): self.connect() - print >>sys.stderr, "READY." + print >>sys.stderr, "FORWARDER_READY." while not self._stop: data = self.read_data() self.send_to_server(data) @@ -373,11 +377,14 @@ class Forwarder(object): class Client(object): def __init__(self, root_dir = ".", host = None, port = None, user = None, - agent = None, environment_setup = ""): + agent = None, sudo = False, communication = DC.ACCESS_LOCAL, + environment_setup = ""): self.root_dir = root_dir self.addr = (host, port) self.user = user self.agent = agent + self.sudo = sudo + self.communication = communication self.environment_setup = environment_setup self._stopped = False self._deferreds = collections.deque() @@ -393,31 +400,26 @@ class Client(object): (host, port) = self.addr user = self.user agent = self.agent + sudo = self.sudo + communication = self.communication python_code = "from nepi.util import server;c=server.Forwarder(%r);\ c.forward()" % (root_dir,) - if host != None: - self._process = popen_ssh_subprocess(python_code, host, port, - user, agent, + + self._process = popen_python(python_code, + communication = communication, + host = host, + port = port, + user = user, + agent = agent, + sudo = sudo, environment_setup = self.environment_setup) - # popen_ssh_subprocess already waits for readiness - if self._process.poll(): - err = proc.stderr.read() - raise RuntimeError("Client could not be reached: %s" % \ - err) - else: - self._process = subprocess.Popen( - ["python", "-c", python_code], - stdin = subprocess.PIPE, - stdout = subprocess.PIPE, - stderr = subprocess.PIPE - ) - + # Wait for the forwarder to be ready, otherwise nobody # will be able to connect to it helo = self._process.stderr.readline() - if helo != 'READY.\n': - raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo, + if helo != 'FORWARDER_READY.\n': + raise AssertionError, "Expected 'FORWARDER_READY.', got %r: %s" % (helo, helo + self._process.stderr.read()) def send_msg(self, msg): @@ -520,309 +522,321 @@ def _make_server_key_args(server_key, host, port, args): return tmp_known_hosts def popen_ssh_command(command, host, port, user, agent, - stdin="", - ident_key = None, - server_key = None, - tty = False, - timeout = None, - retry = 0, - err_on_timeout = True): - """ - Executes a remote commands, returns ((stdout,stderr),process) - """ - if TRACE: - print "ssh", host, command + stdin="", + ident_key = None, + server_key = None, + tty = False, + timeout = None, + retry = 0, + err_on_timeout = True): + """ + Executes a remote commands, returns ((stdout,stderr),process) + """ + if TRACE: + print "ssh", host, command + + tmp_known_hosts = None + args = ['ssh', + # Don't bother with localhost. Makes test easier + '-o', 'NoHostAuthenticationForLocalhost=yes', + '-l', user, host] + if agent: + args.append('-A') + if port: + args.append('-p%d' % port) + if ident_key: + args.extend(('-i', ident_key)) + if tty: + args.append('-t') + if server_key: + # Create a temporary server key file + tmp_known_hosts = _make_server_key_args( + server_key, host, port, args) + args.append(command) + + while 1: + # connects to the remote host and starts a remote connection + proc = subprocess.Popen(args, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE) + + # attach tempfile object to the process, to make sure the file stays + # alive until the process is finished with it + proc._known_hosts = tmp_known_hosts + + try: + out, err = _communicate(proc, stdin, timeout, err_on_timeout) + break + except RuntimeError,e: + if retry <= 0: + raise + if TRACE: + print " timedout -> ", e.args + retry -= 1 + + if TRACE: + print " -> ", out, err + + return ((out, err), proc) + +def popen_scp(source, dest, + port = None, + agent = None, + recursive = False, + ident_key = None, + server_key = None): + """ + Copies from/to remote sites. + + Source and destination should have the user and host encoded + as per scp specs. + + If source is a file object, a special mode will be used to + create the remote file with the same contents. + + If dest is a file object, the remote file (source) will be + read and written into dest. + + In these modes, recursive cannot be True. + + Source can be a list of files to copy to a single destination, + in which case it is advised that the destination be a folder. + """ + + if TRACE: + print "scp", source, dest + + if isinstance(source, file) and source.tell() == 0: + source = source.name + elif hasattr(source, 'read'): + tmp = tempfile.NamedTemporaryFile() + while True: + buf = source.read(65536) + if buf: + tmp.write(buf) + else: + break + tmp.seek(0) + source = tmp.name + + if isinstance(source, file) or isinstance(dest, file) \ + or hasattr(source, 'read') or hasattr(dest, 'write'): + assert not recursive + # Parse source/destination as <user>@<server>:<path> + if isinstance(dest, basestring) and ':' in dest: + remspec, path = dest.split(':',1) + elif isinstance(source, basestring) and ':' in source: + remspec, path = source.split(':',1) + else: + raise ValueError, "Both endpoints cannot be local" + user,host = remspec.rsplit('@',1) tmp_known_hosts = None - args = ['ssh', + + args = ['ssh', '-l', user, '-C', # Don't bother with localhost. Makes test easier '-o', 'NoHostAuthenticationForLocalhost=yes', - '-l', user, host] - if agent: - args.append('-A') + host ] if port: - args.append('-p%d' % port) + args.append('-P%d' % port) if ident_key: args.extend(('-i', ident_key)) - if tty: - args.append('-t') if server_key: # Create a temporary server key file tmp_known_hosts = _make_server_key_args( server_key, host, port, args) - args.append(command) - - while 1: - # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) - - # attach tempfile object to the process, to make sure the file stays - # alive until the process is finished with it - proc._known_hosts = tmp_known_hosts - - try: - out, err = _communicate(proc, stdin, timeout, err_on_timeout) - break - except RuntimeError,e: - if retry <= 0: - raise - if TRACE: - print " timedout -> ", e.args - retry -= 1 - - if TRACE: - print " -> ", out, err - - return ((out, err), proc) - -def popen_scp(source, dest, - port = None, - agent = None, - recursive = False, - ident_key = None, - server_key = None): - """ - Copies from/to remote sites. - - Source and destination should have the user and host encoded - as per scp specs. - - If source is a file object, a special mode will be used to - create the remote file with the same contents. - If dest is a file object, the remote file (source) will be - read and written into dest. - - In these modes, recursive cannot be True. - - Source can be a list of files to copy to a single destination, - in which case it is advised that the destination be a folder. - """ - - if TRACE: - print "scp", source, dest + if isinstance(source, file) or hasattr(source, 'read'): + args.append('cat > %s' % (shell_escape(path),)) + elif isinstance(dest, file) or hasattr(dest, 'write'): + args.append('cat %s' % (shell_escape(path),)) + else: + raise AssertionError, "Unreachable code reached! :-Q" - if isinstance(source, file) and source.tell() == 0: - source = source.name + # connects to the remote host and starts a remote connection + if isinstance(source, file): + proc = subprocess.Popen(args, + stdout = open('/dev/null','w'), + stderr = subprocess.PIPE, + stdin = source) + err = proc.stderr.read() + proc._known_hosts = tmp_known_hosts + eintr_retry(proc.wait)() + return ((None,err), proc) + elif isinstance(dest, file): + proc = subprocess.Popen(args, + stdout = open('/dev/null','w'), + stderr = subprocess.PIPE, + stdin = source) + err = proc.stderr.read() + proc._known_hosts = tmp_known_hosts + eintr_retry(proc.wait)() + return ((None,err), proc) elif hasattr(source, 'read'): - tmp = tempfile.NamedTemporaryFile() + # file-like (but not file) source + proc = subprocess.Popen(args, + stdout = open('/dev/null','w'), + stderr = subprocess.PIPE, + stdin = subprocess.PIPE) + + buf = None + err = [] while True: - buf = source.read(65536) - if buf: - tmp.write(buf) - else: + if not buf: + buf = source.read(4096) + if not buf: + #EOF break - tmp.seek(0) - source = tmp.name - - if isinstance(source, file) or isinstance(dest, file) \ - or hasattr(source, 'read') or hasattr(dest, 'write'): - assert not recursive - - # Parse source/destination as <user>@<server>:<path> - if isinstance(dest, basestring) and ':' in dest: - remspec, path = dest.split(':',1) - elif isinstance(source, basestring) and ':' in source: - remspec, path = source.split(':',1) - else: - raise ValueError, "Both endpoints cannot be local" - user,host = remspec.rsplit('@',1) - tmp_known_hosts = None - - args = ['ssh', '-l', user, '-C', - # Don't bother with localhost. Makes test easier - '-o', 'NoHostAuthenticationForLocalhost=yes', - host ] - if port: - args.append('-P%d' % port) - if ident_key: - args.extend(('-i', ident_key)) - if server_key: - # Create a temporary server key file - tmp_known_hosts = _make_server_key_args( - server_key, host, port, args) - - if isinstance(source, file) or hasattr(source, 'read'): - args.append('cat > %s' % (shell_escape(path),)) - elif isinstance(dest, file) or hasattr(dest, 'write'): - args.append('cat %s' % (shell_escape(path),)) - else: - raise AssertionError, "Unreachable code reached! :-Q" + + rdrdy, wrdy, broken = select.select( + [proc.stderr], + [proc.stdin], + [proc.stderr,proc.stdin]) + + if proc.stderr in rdrdy: + # use os.read for fully unbuffered behavior + err.append(os.read(proc.stderr.fileno(), 4096)) + + if proc.stdin in wrdy: + proc.stdin.write(buf) + buf = None + + if broken: + break + proc.stdin.close() + err.append(proc.stderr.read()) + + proc._known_hosts = tmp_known_hosts + eintr_retry(proc.wait)() + return ((None,''.join(err)), proc) + elif hasattr(dest, 'write'): + # file-like (but not file) dest + proc = subprocess.Popen(args, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE, + stdin = open('/dev/null','w')) - # connects to the remote host and starts a remote connection - if isinstance(source, file): - proc = subprocess.Popen(args, - stdout = open('/dev/null','w'), - stderr = subprocess.PIPE, - stdin = source) - err = proc.stderr.read() - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,err), proc) - elif isinstance(dest, file): - proc = subprocess.Popen(args, - stdout = open('/dev/null','w'), - stderr = subprocess.PIPE, - stdin = source) - err = proc.stderr.read() - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,err), proc) - elif hasattr(source, 'read'): - # file-like (but not file) source - proc = subprocess.Popen(args, - stdout = open('/dev/null','w'), - stderr = subprocess.PIPE, - stdin = subprocess.PIPE) + buf = None + err = [] + while True: + rdrdy, wrdy, broken = select.select( + [proc.stderr, proc.stdout], + [], + [proc.stderr, proc.stdout]) - buf = None - err = [] - while True: - if not buf: - buf = source.read(4096) + if proc.stderr in rdrdy: + # use os.read for fully unbuffered behavior + err.append(os.read(proc.stderr.fileno(), 4096)) + + if proc.stdout in rdrdy: + # use os.read for fully unbuffered behavior + buf = os.read(proc.stdout.fileno(), 4096) + dest.write(buf) + if not buf: #EOF break - - rdrdy, wrdy, broken = select.select( - [proc.stderr], - [proc.stdin], - [proc.stderr,proc.stdin]) - - if proc.stderr in rdrdy: - # use os.read for fully unbuffered behavior - err.append(os.read(proc.stderr.fileno(), 4096)) - - if proc.stdin in wrdy: - proc.stdin.write(buf) - buf = None - - if broken: - break - proc.stdin.close() - err.append(proc.stderr.read()) - - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,''.join(err)), proc) - elif hasattr(dest, 'write'): - # file-like (but not file) dest - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stderr = subprocess.PIPE, - stdin = open('/dev/null','w')) - buf = None - err = [] - while True: - rdrdy, wrdy, broken = select.select( - [proc.stderr, proc.stdout], - [], - [proc.stderr, proc.stdout]) - - if proc.stderr in rdrdy: - # use os.read for fully unbuffered behavior - err.append(os.read(proc.stderr.fileno(), 4096)) - - if proc.stdout in rdrdy: - # use os.read for fully unbuffered behavior - buf = os.read(proc.stdout.fileno(), 4096) - dest.write(buf) - - if not buf: - #EOF - break - - if broken: - break - err.append(proc.stderr.read()) - - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,''.join(err)), proc) - else: - raise AssertionError, "Unreachable code reached! :-Q" - else: - # Parse destination as <user>@<server>:<path> - if isinstance(dest, basestring) and ':' in dest: - remspec, path = dest.split(':',1) - elif isinstance(source, basestring) and ':' in source: - remspec, path = source.split(':',1) - else: - raise ValueError, "Both endpoints cannot be local" - user,host = remspec.rsplit('@',1) - - # plain scp - tmp_known_hosts = None - args = ['scp', '-q', '-p', '-C', - # Don't bother with localhost. Makes test easier - '-o', 'NoHostAuthenticationForLocalhost=yes' ] - if port: - args.append('-P%d' % port) - if recursive: - args.append('-r') - if ident_key: - args.extend(('-i', ident_key)) - if server_key: - # Create a temporary server key file - tmp_known_hosts = _make_server_key_args( - server_key, host, port, args) - if isinstance(source,list): - args.extend(source) - else: - args.append(source) - args.append(dest) - - # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) + if broken: + break + err.append(proc.stderr.read()) + proc._known_hosts = tmp_known_hosts - - comm = proc.communicate() eintr_retry(proc.wait)() - return (comm, proc) - -def popen_ssh_subprocess(python_code, host, port, user, agent, + return ((None,''.join(err)), proc) + else: + raise AssertionError, "Unreachable code reached! :-Q" + else: + # Parse destination as <user>@<server>:<path> + if isinstance(dest, basestring) and ':' in dest: + remspec, path = dest.split(':',1) + elif isinstance(source, basestring) and ':' in source: + remspec, path = source.split(':',1) + else: + raise ValueError, "Both endpoints cannot be local" + user,host = remspec.rsplit('@',1) + + # plain scp + tmp_known_hosts = None + args = ['scp', '-q', '-p', '-C', + # Don't bother with localhost. Makes test easier + '-o', 'NoHostAuthenticationForLocalhost=yes' ] + if port: + args.append('-P%d' % port) + if recursive: + args.append('-r') + if ident_key: + args.extend(('-i', ident_key)) + if server_key: + # Create a temporary server key file + tmp_known_hosts = _make_server_key_args( + server_key, host, port, args) + if isinstance(source,list): + args.extend(source) + else: + args.append(source) + args.append(dest) + + # connects to the remote host and starts a remote connection + proc = subprocess.Popen(args, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE) + proc._known_hosts = tmp_known_hosts + + comm = proc.communicate() + eintr_retry(proc.wait)() + return (comm, proc) + +def decode_and_execute(): + # The python code we want to execute might have characters that + # are not compatible with the 'inline' mode we are using. To avoid + # problems we receive the encoded python code in base64 as a input + # stream and decode it for execution. + import base64, os + cmd = "" + while True: + cmd += os.read(0, 1)# one byte from stdin + if cmd[-1] == "\n": + break + cmd = base64.b64decode(cmd) + # Uncomment for debug + #os.write(2, "Executing python code: %s\n" % cmd) + os.write(1, "OK\n") # send a sync message + exec(cmd) + +def popen_python(python_code, + communication = DC.ACCESS_LOCAL, + host = None, + port = None, + user = None, + agent = False, python_path = None, ident_key = None, server_key = None, tty = False, - environment_setup = "", - waitcommand = False): - cmd = "" - if python_path: - python_path.replace("'", r"'\''") - cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path - cmd += " ; " - if environment_setup: - cmd += environment_setup - cmd += " ; " - # Uncomment for debug (to run everything under strace) - # We had to verify if strace works (cannot nest them) - #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n" - #cmd += "$CMD " - #cmd += "strace -f -tt -s 200 -o strace$$.out " - cmd += "python -c '" - cmd += "import base64, os\n" - cmd += "cmd = \"\"\n" - cmd += "while True:\n" - cmd += " cmd += os.read(0, 1)\n" # one byte from stdin - cmd += " if cmd[-1] == \"\\n\": break\n" - cmd += "cmd = base64.b64decode(cmd)\n" - # Uncomment for debug - #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n" - if not waitcommand: - cmd += "os.write(1, \"OK\\n\")\n" # send a sync message - cmd += "exec(cmd)\n" - if waitcommand: - cmd += "os.write(1, \"OK\\n\")\n" # send a sync message - cmd += "'" - + sudo = False, + environment_setup = ""): + + + shell = False + cmd = "" + if python_path: + python_path.replace("'", r"'\''") + cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path + cmd += " ; " + if environment_setup: + cmd += environment_setup + cmd += " ; " + # Uncomment for debug (to run everything under strace) + # We had to verify if strace works (cannot nest them) + #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n" + #cmd += "$CMD " + #cmd += "strace -f -tt -s 200 -o strace$$.out " + cmd += "python -c 'from nepi.util import server; server.decode_and_execute()'" + + if communication == DC.ACCESS_SSH: tmp_known_hosts = None args = ['ssh', # Don't bother with localhost. Makes test easier @@ -841,23 +855,29 @@ def popen_ssh_subprocess(python_code, host, port, user, agent, tmp_known_hosts = _make_server_key_args( server_key, host, port, args) args.append(cmd) + else: + args = [cmd] + shell = True - # connects to the remote host and starts a remote rpyc connection - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) + # connects to the remote host and starts a remote + proc = subprocess.Popen(args, + shell = shell, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE) + + if communication == DC.ACCESS_SSH: proc._known_hosts = tmp_known_hosts - - # send the command to execute - os.write(proc.stdin.fileno(), - base64.b64encode(python_code) + "\n") - msg = os.read(proc.stdout.fileno(), 3) - if msg != "OK\n": - raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % ( - msg, proc.stdout.read(), proc.stderr.read()) - return proc + # send the command to execute + os.write(proc.stdin.fileno(), + base64.b64encode(python_code) + "\n") + msg = os.read(proc.stdout.fileno(), 3) + if msg != "OK\n": + raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % ( + msg, proc.stdout.read(), proc.stderr.read()) + + return proc # POSIX def _communicate(self, input, timeout=None, err_on_timeout=True): diff --git a/test/core/integration.py b/test/core/integration.py index 3f22eb41..eecded0d 100755 --- a/test/core/integration.py +++ b/test/core/integration.py @@ -117,6 +117,12 @@ class ExecuteTestCase(unittest.TestCase): access_config = proxy.AccessConfiguration() access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) + access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, + "export PYTHONPATH=%r:%r:$PYTHONPATH " + "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % ( + os.path.dirname(os.path.dirname(mock.__file__)), + os.path.dirname(os.path.dirname(mock2.__file__)),)) + controller = proxy.create_experiment_controller(xml, access_config) controller.start() @@ -146,6 +152,11 @@ class ExecuteTestCase(unittest.TestCase): desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) desc.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) + desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, + "export PYTHONPATH=%r:%r:$PYTHONPATH " + "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % ( + os.path.dirname(os.path.dirname(mock.__file__)), + os.path.dirname(os.path.dirname(mock2.__file__)),)) xml = exp_desc.to_xml() @@ -182,6 +193,11 @@ class ExecuteTestCase(unittest.TestCase): access_config = proxy.AccessConfiguration() access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) + access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, + "export PYTHONPATH=%r:%r:$PYTHONPATH " + "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % ( + os.path.dirname(os.path.dirname(mock.__file__)), + os.path.dirname(os.path.dirname(mock2.__file__)),)) controller = proxy.create_experiment_controller(xml, access_config) controller.start() @@ -229,6 +245,11 @@ class ExecuteTestCase(unittest.TestCase): access_config = proxy.AccessConfiguration() access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) + access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, + "export PYTHONPATH=%r:%r:$PYTHONPATH " + "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % ( + os.path.dirname(os.path.dirname(mock.__file__)), + os.path.dirname(os.path.dirname(mock2.__file__)),)) controller = proxy.create_experiment_controller(xml, access_config) controller.start() @@ -327,9 +348,6 @@ class ExecuteTestCase(unittest.TestCase): controller.shutdown() def test_ssh_daemonized_integration(self): - # TODO: This test doesn't run because - # sys.modules["nepi.testbeds.mock"] = mock - # is not set in the ssh process exp_desc, desc, app, node1, node2, iface1, iface2 = self.make_test_experiment() env = test_util.test_environment() @@ -337,12 +355,6 @@ class ExecuteTestCase(unittest.TestCase): inst_root_dir = os.path.join(self.root_dir, "instance") os.mkdir(inst_root_dir) desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) - desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, - "export PYTHONPATH=%r:%r:$PYTHONPATH ; " - "export NEPI_TESTBEDS='mock:mock mock2:mock2' ; " % ( - os.path.dirname(os.path.dirname(mock.__file__)), - os.path.dirname(os.path.dirname(mock2.__file__)),)) - xml = exp_desc.to_xml() access_config = proxy.AccessConfiguration() @@ -351,6 +363,11 @@ class ExecuteTestCase(unittest.TestCase): access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH) access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port) access_config.set_attribute_value(DC.USE_AGENT, True) + access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, + "export PYTHONPATH=%r:%r:$PYTHONPATH " + "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % ( + os.path.dirname(os.path.dirname(mock.__file__)), + os.path.dirname(os.path.dirname(mock2.__file__)),)) controller = proxy.create_experiment_controller(xml, access_config) try: diff --git a/test/testbeds/netns/integration.py b/test/testbeds/netns/integration.py index 72a352f7..4c33e83a 100755 --- a/test/testbeds/netns/integration.py +++ b/test/testbeds/netns/integration.py @@ -17,7 +17,8 @@ class NetnsIntegrationTestCase(unittest.TestCase): def setUp(self): self.root_dir = tempfile.mkdtemp() - def _test_if(self, daemonize_testbed, controller_access_configuration): + def _test_switched(self, controller_access_config = None, + testbed_access_config = None): exp_desc = ExperimentDescription() testbed_id = "netns" user = getpass.getuser() @@ -47,20 +48,15 @@ class NetnsIntegrationTestCase(unittest.TestCase): app.connector("node").connect(node1.connector("apps")) app.enable_trace("stdout") - if daemonize_testbed: - netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) - inst_root_dir = os.path.join(self.root_dir, "instance") - os.mkdir(inst_root_dir) - netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) - netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + if testbed_access_config: + for attr in testbed_access_config.attributes: + if attr.value: + netns_desc.set_attribute_value(attr.name, attr.value) xml = exp_desc.to_xml() - if controller_access_configuration: - controller = proxy.create_experiment_controller(xml, - controller_access_configuration) - else: - controller = ExperimentController(xml, self.root_dir) + controller = proxy.create_experiment_controller(xml, + controller_access_config) try: controller.start() @@ -78,52 +74,112 @@ class NetnsIntegrationTestCase(unittest.TestCase): controller.shutdown() @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") - def test_local_if(self): - self._test_if( - daemonize_testbed = False, - controller_access_configuration = None) + def test_switched(self): + self._test_switched() @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") - def test_all_daemonized_controller(self): + def test_daemonized_controller(self): access_config = proxy.AccessConfiguration() access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) - access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) - self._test_if( - daemonize_testbed = False, - controller_access_configuration = access_config) + self._test_switched(controller_access_config = access_config) @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") - def test_all_daemonized_if(self): + def test_daemonized_tbd(self): access_config = proxy.AccessConfiguration() access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) - access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + inst_root_dir = os.path.join(self.root_dir, "instance") + os.mkdir(inst_root_dir) + access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) - self._test_if( - daemonize_testbed = True, - controller_access_configuration = access_config) + self._test_switched(testbed_access_config = access_config) @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") - def test_all_ssh_daemonized_if(self): + def test_daemonized_all(self): + controller_access_config = proxy.AccessConfiguration() + controller_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + controller_access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) + controller_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + + testbed_access_config = proxy.AccessConfiguration() + testbed_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + testbed_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + inst_root_dir = os.path.join(self.root_dir, "instance") + os.mkdir(inst_root_dir) + testbed_access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) + + self._test_switched( + controller_access_config = controller_access_config, + testbed_access_config = testbed_access_config) + + @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") + def test_ssh_daemonized_tbd(self): env = test_util.test_environment() + + testbed_access_config = proxy.AccessConfiguration() + testbed_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + testbed_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + testbed_access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH) + testbed_access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port) + testbed_access_config.set_attribute_value(DC.USE_AGENT, True) + inst_root_dir = os.path.join(self.root_dir, "instance") + os.mkdir(inst_root_dir) + testbed_access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) - access_config = proxy.AccessConfiguration() - access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) - access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) - access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) - access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH) - access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port) - access_config.set_attribute_value(DC.USE_AGENT, True) + self._test_switched( + testbed_access_config = testbed_access_config) + + def test_sudo_daemonized_tbd(self): + env = test_util.test_environment() + + testbed_access_config = proxy.AccessConfiguration() + testbed_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + testbed_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + testbed_access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH) + testbed_access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port) + testbed_access_config.set_attribute_value(DC.USE_AGENT, True) + testbed_access_config.set_attribute_value(DC.USE_SUDO, True) + inst_root_dir = os.path.join(self.root_dir, "instance") + os.mkdir(inst_root_dir) + testbed_access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) - self._test_if( - daemonize_testbed = True, - controller_access_configuration = access_config) + self._test_switched( + testbed_access_config = testbed_access_config) + + @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") + def test_ssh_daemonized_all(self): + env = test_util.test_environment() + + controller_access_config = proxy.AccessConfiguration() + controller_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + controller_access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) + controller_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + controller_access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH) + controller_access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port) + controller_access_config.set_attribute_value(DC.USE_AGENT, True) + + testbed_access_config = proxy.AccessConfiguration() + testbed_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + testbed_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + # BUG! IT DOESN'T WORK WITH 2 LEVELS OF SSH! + #testbed_access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH) + #testbed_access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port) + #testbed_access_config.set_attribute_value(DC.USE_AGENT, True) + inst_root_dir = os.path.join(self.root_dir, "instance") + os.mkdir(inst_root_dir) + testbed_access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) + + self._test_switched( + controller_access_config = controller_access_config, + testbed_access_config = testbed_access_config) def tearDown(self): try: - shutil.rmtree(self.root_dir) + #shutil.rmtree(self.root_dir) + pass except: # retry time.sleep(0.1) diff --git a/test/util/server.py b/test/util/server.py index 7e4c6f1c..0bb3a394 100755 --- a/test/util/server.py +++ b/test/util/server.py @@ -1,8 +1,10 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import getpass from nepi.util import server +from nepi.util.constants import DeploymentConfiguration as DC + +import getpass import os import shutil import sys @@ -83,16 +85,43 @@ class ServerTestCase(unittest.TestCase): reply = c.read_reply() self.assertEqual(reply, "Stopping server") + @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") + def test_sudo_server(self): + env = test_util.test_environment() + user = getpass.getuser() + # launch server + python_code = "from nepi.util import server;s=server.Server('%s');\ + s.run()" % self.root_dir + server.popen_python(python_code, + sudo = True) + c = server.Client(self.root_dir, + sudo = True) + c.send_msg("Hola") + reply = c.read_reply() + self.assertEqual(reply, "Reply to: Hola") + c.send_stop() + reply = c.read_reply() + self.assertEqual(reply, "Stopping server") + + def test_ssh_server(self): env = test_util.test_environment() user = getpass.getuser() # launch server python_code = "from nepi.util import server;s=server.Server('%s');\ s.run()" % self.root_dir - server.popen_ssh_subprocess(python_code, host = "localhost", - port = env.port, user = user, agent = True) - c = server.Client(self.root_dir, host = "localhost", port = env.port, - user = user, agent = True) + server.popen_python(python_code, + communication = DC.ACCESS_SSH, + host = "localhost", + port = env.port, + user = user, + agent = True) + c = server.Client(self.root_dir, + communication = DC.ACCESS_SSH, + host = "localhost", + port = env.port, + user = user, + agent = True) c.send_msg("Hola") reply = c.read_reply() self.assertEqual(reply, "Reply to: Hola") @@ -106,11 +135,19 @@ class ServerTestCase(unittest.TestCase): # launch server python_code = "from nepi.util import server;s=server.Server('%s');\ s.run()" % self.root_dir - server.popen_ssh_subprocess(python_code, host = "localhost", - port = env.port, user = user, agent = True) + server.popen_python(python_code, + communication = DC.ACCESS_SSH, + host = "localhost", + port = env.port, + user = user, + agent = True) - c = server.Client(self.root_dir, host = "localhost", port = env.port, - user = user, agent = True) + c = server.Client(self.root_dir, + communication = DC.ACCESS_SSH, + host = "localhost", + port = env.port, + user = user, + agent = True) c.send_msg("Hola") reply = c.read_reply() @@ -120,8 +157,12 @@ class ServerTestCase(unittest.TestCase): del c # reconnect - c = server.Client(self.root_dir, host = "localhost", port = env.port, - user = user, agent = True) + c = server.Client(self.root_dir, + communication = DC.ACCESS_SSH, + host = "localhost", + port = env.port, + user = user, + agent = True) c.send_msg("Hola") reply = c.read_reply() @@ -137,11 +178,19 @@ class ServerTestCase(unittest.TestCase): # launch server python_code = "from nepi.util import server;s=server.Server('%s');\ s.run()" % self.root_dir - server.popen_ssh_subprocess(python_code, host = "localhost", - port = env.port, user = user, agent = True) + server.popen_python(python_code, + communication = DC.ACCESS_SSH, + host = "localhost", + port = env.port, + user = user, + agent = True) - c = server.Client(self.root_dir, host = "localhost", port = env.port, - user = user, agent = True) + c = server.Client(self.root_dir, + communication = DC.ACCESS_SSH, + host = "localhost", + port = env.port, + user = user, + agent = True) c.send_msg("Hola") reply = c.read_reply()