import functools
import time
import logging
+logging.basicConfig()
ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
"validation_function" : validation.is_bool,
"category" : AC.CATEGORY_DEPLOYMENT,
}),
+ DC.USE_SUDO : dict({
+ "name" : DC.USE_SUDO,
+ "help" : "Use sudo to run the deamon process. This option only take flace when the server runs in daemon mode.",
+ "type" : Attribute.BOOL,
+ "value" : False,
+ "flags" : Attribute.ExecReadOnly |\
+ Attribute.ExecImmutable |\
+ Attribute.Metadata,
+ "validation_function" : validation.is_bool,
+ "category" : AC.CATEGORY_DEPLOYMENT,
+ }),
DC.LOG_LEVEL : dict({
"name" : DC.LOG_LEVEL,
"help" : "Log level for instance",
ROOT_DIRECTORY = "rootDirectory"
USE_AGENT = "useAgent"
+ USE_SUDO = "useSudo"
LOG_LEVEL = "logLevel"
RECOVER = "recover"
RECOVERY_POLICY = "recoveryPolicy"
root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
log_level = to_server_log_level(log_level)
- user = host = port = agent = key = None
+ user = host = port = agent = key = sudo = None
communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
environment_setup = (
access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
else ""
)
- if communication == DC.ACCESS_SSH:
- user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
- host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
- port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
- agent = access_config.get_attribute_value(DC.USE_AGENT)
- key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
- return (root_dir, log_level, user, host, port, key, agent, environment_setup)
+ user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
+ host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
+ port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
+ agent = access_config.get_attribute_value(DC.USE_AGENT)
+ sudo = access_config.get_attribute_value(DC.USE_SUDO)
+ key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
+ communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
+ return (root_dir, log_level, communication, user, host, port, key, agent,
+ sudo, environment_setup)
class AccessConfiguration(AttributesMap):
def __init__(self, params = None):
return controller
elif mode == DC.MODE_DAEMON:
- (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
- get_access_config_params(access_config)
+ (root_dir, log_level, communication, user, host, port, key, agent,
+ sudo, environment_setup) = get_access_config_params(access_config)
try:
return ExperimentControllerProxy(root_dir, log_level,
- experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
- agent = agent, launch = launch,
+ experiment_xml = xml,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = key,
+ agent = agent,
+ sudo = sudo,
+ launch = launch,
environment_setup = environment_setup)
except:
if not launch:
# Maybe controller died, recover from persisted testbed information if possible
controller = ExperimentControllerProxy(root_dir, log_level,
- experiment_xml = xml, host = host, port = port, user = user, ident_key = key,
- agent = agent, launch = True,
+ experiment_xml = xml,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = key,
+ agent = agent,
+ sudo = sudo,
+ launch = True,
environment_setup = environment_setup)
controller.recover()
return controller
raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
return _build_testbed_controller(testbed_id, testbed_version)
elif mode == DC.MODE_DAEMON:
- (root_dir, log_level, user, host, port, key, agent, environment_setup) = \
- get_access_config_params(access_config)
- return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
- testbed_version = testbed_version, host = host, port = port, ident_key = key,
- user = user, agent = agent, launch = launch,
+ (root_dir, log_level, communication, user, host, port, key, agent,
+ sudo, environment_setup) = get_access_config_params(access_config)
+ return TestbedControllerProxy(root_dir, log_level,
+ testbed_id = testbed_id,
+ testbed_version = testbed_version,
+ communication = communication,
+ host = host,
+ port = port,
+ ident_key = key,
+ user = user,
+ agent = agent,
+ sudo = sudo,
+ launch = launch,
environment_setup = environment_setup)
raise RuntimeError("Unsupported access configuration '%s'" % mode)
_ServerClass = None
_ServerClassModule = "nepi.util.proxy"
- def __init__(self,
- ctor_args, root_dir,
- launch = True, host = None,
- port = None, user = None, ident_key = None, agent = None,
+ def __init__(self, ctor_args, root_dir,
+ launch = True,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
environment_setup = ""):
if launch:
- # ssh
- if host != None:
- python_code = (
+ python_code = (
"from %(classmodule)s import %(classname)s;"
"s = %(classname)s%(ctor_args)r;"
"s.run()"
classmodule = self._ServerClassModule,
ctor_args = ctor_args
) )
- proc = server.popen_ssh_subprocess(python_code, host = host,
- port = port, user = user, agent = agent,
- ident_key = ident_key,
- environment_setup = environment_setup,
- waitcommand = True)
- if proc.poll():
- err = proc.stderr.read()
- raise RuntimeError, "Server could not be executed: %s" % (err,)
- else:
- # launch daemon
- s = self._ServerClass(*ctor_args)
- s.run()
-
+ proc = server.popen_python(python_code,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ ident_key = ident_key,
+ sudo = sudo,
+ environment_setup = environment_setup)
+ # Wait for the server to be ready, otherwise nobody
+ # will be able to connect to it
+ helo = proc.stderr.readline()
+ if helo != 'SERVER_READY.\n':
+ raise AssertionError, "Expected 'SERVER_READY.', got %r: %s" % (helo,
+ helo + proc.stderr.read())
# connect client to server
- self._client = server.Client(root_dir, host = host, port = port,
- user = user, agent = agent,
+ self._client = server.Client(root_dir,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ sudo = sudo,
environment_setup = environment_setup)
@staticmethod
_ServerClass = TestbedControllerServer
- def __init__(self, root_dir, log_level, testbed_id = None,
- testbed_version = None, launch = True, host = None,
- port = None, user = None, ident_key = None, agent = None,
+ def __init__(self, root_dir, log_level,
+ testbed_id = None,
+ testbed_version = None,
+ launch = True,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
environment_setup = ""):
if launch and (testbed_id == None or testbed_version == None):
raise RuntimeError("To launch a TesbedControllerServer a "
super(TestbedControllerProxy,self).__init__(
ctor_args = (root_dir, log_level, testbed_id, testbed_version, environment_setup),
root_dir = root_dir,
- launch = launch, host = host, port = port, user = user,
- ident_key = ident_key, agent = agent,
+ launch = launch,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = ident_key,
+ agent = agent,
+ sudo = sudo,
environment_setup = environment_setup)
locals().update( BaseProxy._make_stubs(
class ExperimentControllerProxy(BaseProxy):
_ServerClass = ExperimentControllerServer
- def __init__(self, root_dir, log_level, experiment_xml = None,
- launch = True, host = None, port = None, user = None,
- ident_key = None, agent = None, environment_setup = ""):
+ def __init__(self, root_dir, log_level,
+ experiment_xml = None,
+ launch = True,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
+ environment_setup = ""):
super(ExperimentControllerProxy,self).__init__(
ctor_args = (root_dir, log_level, experiment_xml, environment_setup),
root_dir = root_dir,
- launch = launch, host = host, port = port, user = user,
- ident_key = ident_key, agent = agent,
+ launch = launch,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = ident_key,
+ agent = agent,
+ sudo = sudo,
environment_setup = environment_setup)
locals().update( BaseProxy._make_stubs(
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+from nepi.util.constants import DeploymentConfiguration as DC
+
import base64
import errno
import os
# first process (the one that did the first fork) returned.
os._exit(0)
except:
+ print >>sys.stderr, "SERVER_ERROR."
self.log_error()
self.cleanup()
os._exit(0)
+ print >>sys.stderr, "SERVER_READY."
def daemonize(self):
# pipes for process synchronization
def forward(self):
self.connect()
- print >>sys.stderr, "READY."
+ print >>sys.stderr, "FORWARDER_READY."
while not self._stop:
data = self.read_data()
self.send_to_server(data)
class Client(object):
def __init__(self, root_dir = ".", host = None, port = None, user = None,
- agent = None, environment_setup = ""):
+ agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
+ environment_setup = ""):
self.root_dir = root_dir
self.addr = (host, port)
self.user = user
self.agent = agent
+ self.sudo = sudo
+ self.communication = communication
self.environment_setup = environment_setup
self._stopped = False
self._deferreds = collections.deque()
(host, port) = self.addr
user = self.user
agent = self.agent
+ sudo = self.sudo
+ communication = self.communication
python_code = "from nepi.util import server;c=server.Forwarder(%r);\
c.forward()" % (root_dir,)
- if host != None:
- self._process = popen_ssh_subprocess(python_code, host, port,
- user, agent,
+
+ self._process = popen_python(python_code,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ sudo = sudo,
environment_setup = self.environment_setup)
- # popen_ssh_subprocess already waits for readiness
- if self._process.poll():
- err = proc.stderr.read()
- raise RuntimeError("Client could not be reached: %s" % \
- err)
- else:
- self._process = subprocess.Popen(
- ["python", "-c", python_code],
- stdin = subprocess.PIPE,
- stdout = subprocess.PIPE,
- stderr = subprocess.PIPE
- )
-
+
# Wait for the forwarder to be ready, otherwise nobody
# will be able to connect to it
helo = self._process.stderr.readline()
- if helo != 'READY.\n':
- raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
+ if helo != 'FORWARDER_READY.\n':
+ raise AssertionError, "Expected 'FORWARDER_READY.', got %r: %s" % (helo,
helo + self._process.stderr.read())
def send_msg(self, msg):
return tmp_known_hosts
def popen_ssh_command(command, host, port, user, agent,
- stdin="",
- ident_key = None,
- server_key = None,
- tty = False,
- timeout = None,
- retry = 0,
- err_on_timeout = True):
- """
- Executes a remote commands, returns ((stdout,stderr),process)
- """
- if TRACE:
- print "ssh", host, command
+ stdin="",
+ ident_key = None,
+ server_key = None,
+ tty = False,
+ timeout = None,
+ retry = 0,
+ err_on_timeout = True):
+ """
+ Executes a remote commands, returns ((stdout,stderr),process)
+ """
+ if TRACE:
+ print "ssh", host, command
+
+ tmp_known_hosts = None
+ args = ['ssh',
+ # Don't bother with localhost. Makes test easier
+ '-o', 'NoHostAuthenticationForLocalhost=yes',
+ '-l', user, host]
+ if agent:
+ args.append('-A')
+ if port:
+ args.append('-p%d' % port)
+ if ident_key:
+ args.extend(('-i', ident_key))
+ if tty:
+ args.append('-t')
+ if server_key:
+ # Create a temporary server key file
+ tmp_known_hosts = _make_server_key_args(
+ server_key, host, port, args)
+ args.append(command)
+
+ while 1:
+ # connects to the remote host and starts a remote connection
+ proc = subprocess.Popen(args,
+ stdout = subprocess.PIPE,
+ stdin = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+
+ # attach tempfile object to the process, to make sure the file stays
+ # alive until the process is finished with it
+ proc._known_hosts = tmp_known_hosts
+
+ try:
+ out, err = _communicate(proc, stdin, timeout, err_on_timeout)
+ break
+ except RuntimeError,e:
+ if retry <= 0:
+ raise
+ if TRACE:
+ print " timedout -> ", e.args
+ retry -= 1
+
+ if TRACE:
+ print " -> ", out, err
+
+ return ((out, err), proc)
+
+def popen_scp(source, dest,
+ port = None,
+ agent = None,
+ recursive = False,
+ ident_key = None,
+ server_key = None):
+ """
+ Copies from/to remote sites.
+
+ Source and destination should have the user and host encoded
+ as per scp specs.
+
+ If source is a file object, a special mode will be used to
+ create the remote file with the same contents.
+
+ If dest is a file object, the remote file (source) will be
+ read and written into dest.
+
+ In these modes, recursive cannot be True.
+
+ Source can be a list of files to copy to a single destination,
+ in which case it is advised that the destination be a folder.
+ """
+
+ if TRACE:
+ print "scp", source, dest
+
+ if isinstance(source, file) and source.tell() == 0:
+ source = source.name
+ elif hasattr(source, 'read'):
+ tmp = tempfile.NamedTemporaryFile()
+ while True:
+ buf = source.read(65536)
+ if buf:
+ tmp.write(buf)
+ else:
+ break
+ tmp.seek(0)
+ source = tmp.name
+
+ if isinstance(source, file) or isinstance(dest, file) \
+ or hasattr(source, 'read') or hasattr(dest, 'write'):
+ assert not recursive
+ # Parse source/destination as <user>@<server>:<path>
+ if isinstance(dest, basestring) and ':' in dest:
+ remspec, path = dest.split(':',1)
+ elif isinstance(source, basestring) and ':' in source:
+ remspec, path = source.split(':',1)
+ else:
+ raise ValueError, "Both endpoints cannot be local"
+ user,host = remspec.rsplit('@',1)
tmp_known_hosts = None
- args = ['ssh',
+
+ args = ['ssh', '-l', user, '-C',
# Don't bother with localhost. Makes test easier
'-o', 'NoHostAuthenticationForLocalhost=yes',
- '-l', user, host]
- if agent:
- args.append('-A')
+ host ]
if port:
- args.append('-p%d' % port)
+ args.append('-P%d' % port)
if ident_key:
args.extend(('-i', ident_key))
- if tty:
- args.append('-t')
if server_key:
# Create a temporary server key file
tmp_known_hosts = _make_server_key_args(
server_key, host, port, args)
- args.append(command)
-
- while 1:
- # connects to the remote host and starts a remote connection
- proc = subprocess.Popen(args,
- stdout = subprocess.PIPE,
- stdin = subprocess.PIPE,
- stderr = subprocess.PIPE)
-
- # attach tempfile object to the process, to make sure the file stays
- # alive until the process is finished with it
- proc._known_hosts = tmp_known_hosts
-
- try:
- out, err = _communicate(proc, stdin, timeout, err_on_timeout)
- break
- except RuntimeError,e:
- if retry <= 0:
- raise
- if TRACE:
- print " timedout -> ", e.args
- retry -= 1
-
- if TRACE:
- print " -> ", out, err
-
- return ((out, err), proc)
-
-def popen_scp(source, dest,
- port = None,
- agent = None,
- recursive = False,
- ident_key = None,
- server_key = None):
- """
- Copies from/to remote sites.
-
- Source and destination should have the user and host encoded
- as per scp specs.
-
- If source is a file object, a special mode will be used to
- create the remote file with the same contents.
- If dest is a file object, the remote file (source) will be
- read and written into dest.
-
- In these modes, recursive cannot be True.
-
- Source can be a list of files to copy to a single destination,
- in which case it is advised that the destination be a folder.
- """
-
- if TRACE:
- print "scp", source, dest
+ if isinstance(source, file) or hasattr(source, 'read'):
+ args.append('cat > %s' % (shell_escape(path),))
+ elif isinstance(dest, file) or hasattr(dest, 'write'):
+ args.append('cat %s' % (shell_escape(path),))
+ else:
+ raise AssertionError, "Unreachable code reached! :-Q"
- if isinstance(source, file) and source.tell() == 0:
- source = source.name
+ # connects to the remote host and starts a remote connection
+ if isinstance(source, file):
+ proc = subprocess.Popen(args,
+ stdout = open('/dev/null','w'),
+ stderr = subprocess.PIPE,
+ stdin = source)
+ err = proc.stderr.read()
+ proc._known_hosts = tmp_known_hosts
+ eintr_retry(proc.wait)()
+ return ((None,err), proc)
+ elif isinstance(dest, file):
+ proc = subprocess.Popen(args,
+ stdout = open('/dev/null','w'),
+ stderr = subprocess.PIPE,
+ stdin = source)
+ err = proc.stderr.read()
+ proc._known_hosts = tmp_known_hosts
+ eintr_retry(proc.wait)()
+ return ((None,err), proc)
elif hasattr(source, 'read'):
- tmp = tempfile.NamedTemporaryFile()
+ # file-like (but not file) source
+ proc = subprocess.Popen(args,
+ stdout = open('/dev/null','w'),
+ stderr = subprocess.PIPE,
+ stdin = subprocess.PIPE)
+
+ buf = None
+ err = []
while True:
- buf = source.read(65536)
- if buf:
- tmp.write(buf)
- else:
+ if not buf:
+ buf = source.read(4096)
+ if not buf:
+ #EOF
break
- tmp.seek(0)
- source = tmp.name
-
- if isinstance(source, file) or isinstance(dest, file) \
- or hasattr(source, 'read') or hasattr(dest, 'write'):
- assert not recursive
-
- # Parse source/destination as <user>@<server>:<path>
- if isinstance(dest, basestring) and ':' in dest:
- remspec, path = dest.split(':',1)
- elif isinstance(source, basestring) and ':' in source:
- remspec, path = source.split(':',1)
- else:
- raise ValueError, "Both endpoints cannot be local"
- user,host = remspec.rsplit('@',1)
- tmp_known_hosts = None
-
- args = ['ssh', '-l', user, '-C',
- # Don't bother with localhost. Makes test easier
- '-o', 'NoHostAuthenticationForLocalhost=yes',
- host ]
- if port:
- args.append('-P%d' % port)
- if ident_key:
- args.extend(('-i', ident_key))
- if server_key:
- # Create a temporary server key file
- tmp_known_hosts = _make_server_key_args(
- server_key, host, port, args)
-
- if isinstance(source, file) or hasattr(source, 'read'):
- args.append('cat > %s' % (shell_escape(path),))
- elif isinstance(dest, file) or hasattr(dest, 'write'):
- args.append('cat %s' % (shell_escape(path),))
- else:
- raise AssertionError, "Unreachable code reached! :-Q"
+
+ rdrdy, wrdy, broken = select.select(
+ [proc.stderr],
+ [proc.stdin],
+ [proc.stderr,proc.stdin])
+
+ if proc.stderr in rdrdy:
+ # use os.read for fully unbuffered behavior
+ err.append(os.read(proc.stderr.fileno(), 4096))
+
+ if proc.stdin in wrdy:
+ proc.stdin.write(buf)
+ buf = None
+
+ if broken:
+ break
+ proc.stdin.close()
+ err.append(proc.stderr.read())
+
+ proc._known_hosts = tmp_known_hosts
+ eintr_retry(proc.wait)()
+ return ((None,''.join(err)), proc)
+ elif hasattr(dest, 'write'):
+ # file-like (but not file) dest
+ proc = subprocess.Popen(args,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE,
+ stdin = open('/dev/null','w'))
- # connects to the remote host and starts a remote connection
- if isinstance(source, file):
- proc = subprocess.Popen(args,
- stdout = open('/dev/null','w'),
- stderr = subprocess.PIPE,
- stdin = source)
- err = proc.stderr.read()
- proc._known_hosts = tmp_known_hosts
- eintr_retry(proc.wait)()
- return ((None,err), proc)
- elif isinstance(dest, file):
- proc = subprocess.Popen(args,
- stdout = open('/dev/null','w'),
- stderr = subprocess.PIPE,
- stdin = source)
- err = proc.stderr.read()
- proc._known_hosts = tmp_known_hosts
- eintr_retry(proc.wait)()
- return ((None,err), proc)
- elif hasattr(source, 'read'):
- # file-like (but not file) source
- proc = subprocess.Popen(args,
- stdout = open('/dev/null','w'),
- stderr = subprocess.PIPE,
- stdin = subprocess.PIPE)
+ buf = None
+ err = []
+ while True:
+ rdrdy, wrdy, broken = select.select(
+ [proc.stderr, proc.stdout],
+ [],
+ [proc.stderr, proc.stdout])
- buf = None
- err = []
- while True:
- if not buf:
- buf = source.read(4096)
+ if proc.stderr in rdrdy:
+ # use os.read for fully unbuffered behavior
+ err.append(os.read(proc.stderr.fileno(), 4096))
+
+ if proc.stdout in rdrdy:
+ # use os.read for fully unbuffered behavior
+ buf = os.read(proc.stdout.fileno(), 4096)
+ dest.write(buf)
+
if not buf:
#EOF
break
-
- rdrdy, wrdy, broken = select.select(
- [proc.stderr],
- [proc.stdin],
- [proc.stderr,proc.stdin])
-
- if proc.stderr in rdrdy:
- # use os.read for fully unbuffered behavior
- err.append(os.read(proc.stderr.fileno(), 4096))
-
- if proc.stdin in wrdy:
- proc.stdin.write(buf)
- buf = None
-
- if broken:
- break
- proc.stdin.close()
- err.append(proc.stderr.read())
-
- proc._known_hosts = tmp_known_hosts
- eintr_retry(proc.wait)()
- return ((None,''.join(err)), proc)
- elif hasattr(dest, 'write'):
- # file-like (but not file) dest
- proc = subprocess.Popen(args,
- stdout = subprocess.PIPE,
- stderr = subprocess.PIPE,
- stdin = open('/dev/null','w'))
- buf = None
- err = []
- while True:
- rdrdy, wrdy, broken = select.select(
- [proc.stderr, proc.stdout],
- [],
- [proc.stderr, proc.stdout])
-
- if proc.stderr in rdrdy:
- # use os.read for fully unbuffered behavior
- err.append(os.read(proc.stderr.fileno(), 4096))
-
- if proc.stdout in rdrdy:
- # use os.read for fully unbuffered behavior
- buf = os.read(proc.stdout.fileno(), 4096)
- dest.write(buf)
-
- if not buf:
- #EOF
- break
-
- if broken:
- break
- err.append(proc.stderr.read())
-
- proc._known_hosts = tmp_known_hosts
- eintr_retry(proc.wait)()
- return ((None,''.join(err)), proc)
- else:
- raise AssertionError, "Unreachable code reached! :-Q"
- else:
- # Parse destination as <user>@<server>:<path>
- if isinstance(dest, basestring) and ':' in dest:
- remspec, path = dest.split(':',1)
- elif isinstance(source, basestring) and ':' in source:
- remspec, path = source.split(':',1)
- else:
- raise ValueError, "Both endpoints cannot be local"
- user,host = remspec.rsplit('@',1)
-
- # plain scp
- tmp_known_hosts = None
- args = ['scp', '-q', '-p', '-C',
- # Don't bother with localhost. Makes test easier
- '-o', 'NoHostAuthenticationForLocalhost=yes' ]
- if port:
- args.append('-P%d' % port)
- if recursive:
- args.append('-r')
- if ident_key:
- args.extend(('-i', ident_key))
- if server_key:
- # Create a temporary server key file
- tmp_known_hosts = _make_server_key_args(
- server_key, host, port, args)
- if isinstance(source,list):
- args.extend(source)
- else:
- args.append(source)
- args.append(dest)
-
- # connects to the remote host and starts a remote connection
- proc = subprocess.Popen(args,
- stdout = subprocess.PIPE,
- stdin = subprocess.PIPE,
- stderr = subprocess.PIPE)
+ if broken:
+ break
+ err.append(proc.stderr.read())
+
proc._known_hosts = tmp_known_hosts
-
- comm = proc.communicate()
eintr_retry(proc.wait)()
- return (comm, proc)
-
-def popen_ssh_subprocess(python_code, host, port, user, agent,
+ return ((None,''.join(err)), proc)
+ else:
+ raise AssertionError, "Unreachable code reached! :-Q"
+ else:
+ # Parse destination as <user>@<server>:<path>
+ if isinstance(dest, basestring) and ':' in dest:
+ remspec, path = dest.split(':',1)
+ elif isinstance(source, basestring) and ':' in source:
+ remspec, path = source.split(':',1)
+ else:
+ raise ValueError, "Both endpoints cannot be local"
+ user,host = remspec.rsplit('@',1)
+
+ # plain scp
+ tmp_known_hosts = None
+ args = ['scp', '-q', '-p', '-C',
+ # Don't bother with localhost. Makes test easier
+ '-o', 'NoHostAuthenticationForLocalhost=yes' ]
+ if port:
+ args.append('-P%d' % port)
+ if recursive:
+ args.append('-r')
+ if ident_key:
+ args.extend(('-i', ident_key))
+ if server_key:
+ # Create a temporary server key file
+ tmp_known_hosts = _make_server_key_args(
+ server_key, host, port, args)
+ if isinstance(source,list):
+ args.extend(source)
+ else:
+ args.append(source)
+ args.append(dest)
+
+ # connects to the remote host and starts a remote connection
+ proc = subprocess.Popen(args,
+ stdout = subprocess.PIPE,
+ stdin = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+ proc._known_hosts = tmp_known_hosts
+
+ comm = proc.communicate()
+ eintr_retry(proc.wait)()
+ return (comm, proc)
+
+def decode_and_execute():
+ # The python code we want to execute might have characters that
+ # are not compatible with the 'inline' mode we are using. To avoid
+ # problems we receive the encoded python code in base64 as a input
+ # stream and decode it for execution.
+ import base64, os
+ cmd = ""
+ while True:
+ cmd += os.read(0, 1)# one byte from stdin
+ if cmd[-1] == "\n":
+ break
+ cmd = base64.b64decode(cmd)
+ # Uncomment for debug
+ #os.write(2, "Executing python code: %s\n" % cmd)
+ os.write(1, "OK\n") # send a sync message
+ exec(cmd)
+
+def popen_python(python_code,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ agent = False,
python_path = None,
ident_key = None,
server_key = None,
tty = False,
- environment_setup = "",
- waitcommand = False):
- cmd = ""
- if python_path:
- python_path.replace("'", r"'\''")
- cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
- cmd += " ; "
- if environment_setup:
- cmd += environment_setup
- cmd += " ; "
- # Uncomment for debug (to run everything under strace)
- # We had to verify if strace works (cannot nest them)
- #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
- #cmd += "$CMD "
- #cmd += "strace -f -tt -s 200 -o strace$$.out "
- cmd += "python -c '"
- cmd += "import base64, os\n"
- cmd += "cmd = \"\"\n"
- cmd += "while True:\n"
- cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
- cmd += " if cmd[-1] == \"\\n\": break\n"
- cmd += "cmd = base64.b64decode(cmd)\n"
- # Uncomment for debug
- #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
- if not waitcommand:
- cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
- cmd += "exec(cmd)\n"
- if waitcommand:
- cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
- cmd += "'"
-
+ sudo = False,
+ environment_setup = ""):
+
+
+ shell = False
+ cmd = ""
+ if python_path:
+ python_path.replace("'", r"'\''")
+ cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
+ cmd += " ; "
+ if environment_setup:
+ cmd += environment_setup
+ cmd += " ; "
+ # Uncomment for debug (to run everything under strace)
+ # We had to verify if strace works (cannot nest them)
+ #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
+ #cmd += "$CMD "
+ #cmd += "strace -f -tt -s 200 -o strace$$.out "
+ cmd += "python -c 'from nepi.util import server; server.decode_and_execute()'"
+
+ if communication == DC.ACCESS_SSH:
tmp_known_hosts = None
args = ['ssh',
# Don't bother with localhost. Makes test easier
tmp_known_hosts = _make_server_key_args(
server_key, host, port, args)
args.append(cmd)
+ else:
+ args = [cmd]
+ shell = True
- # connects to the remote host and starts a remote rpyc connection
- proc = subprocess.Popen(args,
- stdout = subprocess.PIPE,
- stdin = subprocess.PIPE,
- stderr = subprocess.PIPE)
+ # connects to the remote host and starts a remote
+ proc = subprocess.Popen(args,
+ shell = shell,
+ stdout = subprocess.PIPE,
+ stdin = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+
+ if communication == DC.ACCESS_SSH:
proc._known_hosts = tmp_known_hosts
-
- # send the command to execute
- os.write(proc.stdin.fileno(),
- base64.b64encode(python_code) + "\n")
- msg = os.read(proc.stdout.fileno(), 3)
- if msg != "OK\n":
- raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
- msg, proc.stdout.read(), proc.stderr.read())
- return proc
+ # send the command to execute
+ os.write(proc.stdin.fileno(),
+ base64.b64encode(python_code) + "\n")
+ msg = os.read(proc.stdout.fileno(), 3)
+ if msg != "OK\n":
+ raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
+ msg, proc.stdout.read(), proc.stderr.read())
+
+ return proc
# POSIX
def _communicate(self, input, timeout=None, err_on_timeout=True):
access_config = proxy.AccessConfiguration()
access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+ access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP,
+ "export PYTHONPATH=%r:%r:$PYTHONPATH "
+ "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % (
+ os.path.dirname(os.path.dirname(mock.__file__)),
+ os.path.dirname(os.path.dirname(mock2.__file__)),))
+
controller = proxy.create_experiment_controller(xml, access_config)
controller.start()
desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
desc.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+ desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP,
+ "export PYTHONPATH=%r:%r:$PYTHONPATH "
+ "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % (
+ os.path.dirname(os.path.dirname(mock.__file__)),
+ os.path.dirname(os.path.dirname(mock2.__file__)),))
xml = exp_desc.to_xml()
access_config = proxy.AccessConfiguration()
access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+ access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP,
+ "export PYTHONPATH=%r:%r:$PYTHONPATH "
+ "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % (
+ os.path.dirname(os.path.dirname(mock.__file__)),
+ os.path.dirname(os.path.dirname(mock2.__file__)),))
controller = proxy.create_experiment_controller(xml, access_config)
controller.start()
access_config = proxy.AccessConfiguration()
access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+ access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP,
+ "export PYTHONPATH=%r:%r:$PYTHONPATH "
+ "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % (
+ os.path.dirname(os.path.dirname(mock.__file__)),
+ os.path.dirname(os.path.dirname(mock2.__file__)),))
controller = proxy.create_experiment_controller(xml, access_config)
controller.start()
controller.shutdown()
def test_ssh_daemonized_integration(self):
- # TODO: This test doesn't run because
- # sys.modules["nepi.testbeds.mock"] = mock
- # is not set in the ssh process
exp_desc, desc, app, node1, node2, iface1, iface2 = self.make_test_experiment()
env = test_util.test_environment()
inst_root_dir = os.path.join(self.root_dir, "instance")
os.mkdir(inst_root_dir)
desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
- desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP,
- "export PYTHONPATH=%r:%r:$PYTHONPATH ; "
- "export NEPI_TESTBEDS='mock:mock mock2:mock2' ; " % (
- os.path.dirname(os.path.dirname(mock.__file__)),
- os.path.dirname(os.path.dirname(mock2.__file__)),))
-
xml = exp_desc.to_xml()
access_config = proxy.AccessConfiguration()
access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
access_config.set_attribute_value(DC.USE_AGENT, True)
+ access_config.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP,
+ "export PYTHONPATH=%r:%r:$PYTHONPATH "
+ "export NEPI_TESTBEDS='mock:mock mock2:mock2' " % (
+ os.path.dirname(os.path.dirname(mock.__file__)),
+ os.path.dirname(os.path.dirname(mock2.__file__)),))
controller = proxy.create_experiment_controller(xml, access_config)
try:
def setUp(self):
self.root_dir = tempfile.mkdtemp()
- def _test_if(self, daemonize_testbed, controller_access_configuration):
+ def _test_switched(self, controller_access_config = None,
+ testbed_access_config = None):
exp_desc = ExperimentDescription()
testbed_id = "netns"
user = getpass.getuser()
app.connector("node").connect(node1.connector("apps"))
app.enable_trace("stdout")
- if daemonize_testbed:
- netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
- inst_root_dir = os.path.join(self.root_dir, "instance")
- os.mkdir(inst_root_dir)
- netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
- netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+ if testbed_access_config:
+ for attr in testbed_access_config.attributes:
+ if attr.value:
+ netns_desc.set_attribute_value(attr.name, attr.value)
xml = exp_desc.to_xml()
- if controller_access_configuration:
- controller = proxy.create_experiment_controller(xml,
- controller_access_configuration)
- else:
- controller = ExperimentController(xml, self.root_dir)
+ controller = proxy.create_experiment_controller(xml,
+ controller_access_config)
try:
controller.start()
controller.shutdown()
@test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
- def test_local_if(self):
- self._test_if(
- daemonize_testbed = False,
- controller_access_configuration = None)
+ def test_switched(self):
+ self._test_switched()
@test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
- def test_all_daemonized_controller(self):
+ def test_daemonized_controller(self):
access_config = proxy.AccessConfiguration()
access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
- access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+ access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
- self._test_if(
- daemonize_testbed = False,
- controller_access_configuration = access_config)
+ self._test_switched(controller_access_config = access_config)
@test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
- def test_all_daemonized_if(self):
+ def test_daemonized_tbd(self):
access_config = proxy.AccessConfiguration()
access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
- access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+ inst_root_dir = os.path.join(self.root_dir, "instance")
+ os.mkdir(inst_root_dir)
+ access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
- self._test_if(
- daemonize_testbed = True,
- controller_access_configuration = access_config)
+ self._test_switched(testbed_access_config = access_config)
@test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
- def test_all_ssh_daemonized_if(self):
+ def test_daemonized_all(self):
+ controller_access_config = proxy.AccessConfiguration()
+ controller_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ controller_access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+ controller_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+
+ testbed_access_config = proxy.AccessConfiguration()
+ testbed_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ testbed_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+ inst_root_dir = os.path.join(self.root_dir, "instance")
+ os.mkdir(inst_root_dir)
+ testbed_access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
+
+ self._test_switched(
+ controller_access_config = controller_access_config,
+ testbed_access_config = testbed_access_config)
+
+ @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
+ def test_ssh_daemonized_tbd(self):
env = test_util.test_environment()
+
+ testbed_access_config = proxy.AccessConfiguration()
+ testbed_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ testbed_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+ testbed_access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
+ testbed_access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
+ testbed_access_config.set_attribute_value(DC.USE_AGENT, True)
+ inst_root_dir = os.path.join(self.root_dir, "instance")
+ os.mkdir(inst_root_dir)
+ testbed_access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
- access_config = proxy.AccessConfiguration()
- access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
- access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
- access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
- access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
- access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
- access_config.set_attribute_value(DC.USE_AGENT, True)
+ self._test_switched(
+ testbed_access_config = testbed_access_config)
+
+ def test_sudo_daemonized_tbd(self):
+ env = test_util.test_environment()
+
+ testbed_access_config = proxy.AccessConfiguration()
+ testbed_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ testbed_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+ testbed_access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
+ testbed_access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
+ testbed_access_config.set_attribute_value(DC.USE_AGENT, True)
+ testbed_access_config.set_attribute_value(DC.USE_SUDO, True)
+ inst_root_dir = os.path.join(self.root_dir, "instance")
+ os.mkdir(inst_root_dir)
+ testbed_access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
- self._test_if(
- daemonize_testbed = True,
- controller_access_configuration = access_config)
+ self._test_switched(
+ testbed_access_config = testbed_access_config)
+
+ @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
+ def test_ssh_daemonized_all(self):
+ env = test_util.test_environment()
+
+ controller_access_config = proxy.AccessConfiguration()
+ controller_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ controller_access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+ controller_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+ controller_access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
+ controller_access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
+ controller_access_config.set_attribute_value(DC.USE_AGENT, True)
+
+ testbed_access_config = proxy.AccessConfiguration()
+ testbed_access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ testbed_access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+ # BUG! IT DOESN'T WORK WITH 2 LEVELS OF SSH!
+ #testbed_access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
+ #testbed_access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
+ #testbed_access_config.set_attribute_value(DC.USE_AGENT, True)
+ inst_root_dir = os.path.join(self.root_dir, "instance")
+ os.mkdir(inst_root_dir)
+ testbed_access_config.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
+
+ self._test_switched(
+ controller_access_config = controller_access_config,
+ testbed_access_config = testbed_access_config)
def tearDown(self):
try:
- shutil.rmtree(self.root_dir)
+ #shutil.rmtree(self.root_dir)
+ pass
except:
# retry
time.sleep(0.1)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import getpass
from nepi.util import server
+from nepi.util.constants import DeploymentConfiguration as DC
+
+import getpass
import os
import shutil
import sys
reply = c.read_reply()
self.assertEqual(reply, "Stopping server")
+ @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
+ def test_sudo_server(self):
+ env = test_util.test_environment()
+ user = getpass.getuser()
+ # launch server
+ python_code = "from nepi.util import server;s=server.Server('%s');\
+ s.run()" % self.root_dir
+ server.popen_python(python_code,
+ sudo = True)
+ c = server.Client(self.root_dir,
+ sudo = True)
+ c.send_msg("Hola")
+ reply = c.read_reply()
+ self.assertEqual(reply, "Reply to: Hola")
+ c.send_stop()
+ reply = c.read_reply()
+ self.assertEqual(reply, "Stopping server")
+
+
def test_ssh_server(self):
env = test_util.test_environment()
user = getpass.getuser()
# launch server
python_code = "from nepi.util import server;s=server.Server('%s');\
s.run()" % self.root_dir
- server.popen_ssh_subprocess(python_code, host = "localhost",
- port = env.port, user = user, agent = True)
- c = server.Client(self.root_dir, host = "localhost", port = env.port,
- user = user, agent = True)
+ server.popen_python(python_code,
+ communication = DC.ACCESS_SSH,
+ host = "localhost",
+ port = env.port,
+ user = user,
+ agent = True)
+ c = server.Client(self.root_dir,
+ communication = DC.ACCESS_SSH,
+ host = "localhost",
+ port = env.port,
+ user = user,
+ agent = True)
c.send_msg("Hola")
reply = c.read_reply()
self.assertEqual(reply, "Reply to: Hola")
# launch server
python_code = "from nepi.util import server;s=server.Server('%s');\
s.run()" % self.root_dir
- server.popen_ssh_subprocess(python_code, host = "localhost",
- port = env.port, user = user, agent = True)
+ server.popen_python(python_code,
+ communication = DC.ACCESS_SSH,
+ host = "localhost",
+ port = env.port,
+ user = user,
+ agent = True)
- c = server.Client(self.root_dir, host = "localhost", port = env.port,
- user = user, agent = True)
+ c = server.Client(self.root_dir,
+ communication = DC.ACCESS_SSH,
+ host = "localhost",
+ port = env.port,
+ user = user,
+ agent = True)
c.send_msg("Hola")
reply = c.read_reply()
del c
# reconnect
- c = server.Client(self.root_dir, host = "localhost", port = env.port,
- user = user, agent = True)
+ c = server.Client(self.root_dir,
+ communication = DC.ACCESS_SSH,
+ host = "localhost",
+ port = env.port,
+ user = user,
+ agent = True)
c.send_msg("Hola")
reply = c.read_reply()
# launch server
python_code = "from nepi.util import server;s=server.Server('%s');\
s.run()" % self.root_dir
- server.popen_ssh_subprocess(python_code, host = "localhost",
- port = env.port, user = user, agent = True)
+ server.popen_python(python_code,
+ communication = DC.ACCESS_SSH,
+ host = "localhost",
+ port = env.port,
+ user = user,
+ agent = True)
- c = server.Client(self.root_dir, host = "localhost", port = env.port,
- user = user, agent = True)
+ c = server.Client(self.root_dir,
+ communication = DC.ACCESS_SSH,
+ host = "localhost",
+ port = env.port,
+ user = user,
+ agent = True)
c.send_msg("Hola")
reply = c.read_reply()