#!/usr/bin/env python
# -*- coding: utf-8 -*-
+from nepi.util.constants import DeploymentConfiguration as DC
+
import base64
import errno
import os
+import os.path
import resource
import select
import socket
+import signal
import sys
import subprocess
import threading
import signal
import re
import tempfile
+import defer
+import functools
+import collections
CTRL_SOCK = "ctrl.sock"
STD_ERR = "stderr.log"
else:
DEV_NULL = "/dev/null"
-
-
SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
def shell_escape(s):
return s
else:
# unsafe string - escape
- s = s.replace("'","\\'")
+ def escp(c):
+ if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",):
+ return c
+ else:
+ return "'$'\\x%02x''" % (ord(c),)
+ s = ''.join(map(escp,s))
return "'%s'" % (s,)
+def eintr_retry(func):
+ import functools
+ @functools.wraps(func)
+ def rv(*p, **kw):
+ retry = kw.pop("_retry", False)
+ for i in xrange(0 if retry else 4):
+ try:
+ return func(*p, **kw)
+ except (select.error, socket.error), args:
+ if args[0] == errno.EINTR:
+ continue
+ else:
+ raise
+ except OSError, e:
+ if e.errno == errno.EINTR:
+ continue
+ else:
+ raise
+ else:
+ return func(*p, **kw)
+ return rv
+
class Server(object):
- def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
+ def __init__(self, root_dir = ".", log_level = ERROR_LEVEL, environment_setup = ""):
self._root_dir = root_dir
self._stop = False
self._ctrl_sock = None
self._log_level = log_level
+ self._rdbuf = ""
+ self._environment_setup = environment_setup
def run(self):
try:
# first process (the one that did the first fork) returned.
os._exit(0)
except:
+ print >>sys.stderr, "SERVER_ERROR."
self.log_error()
self.cleanup()
os._exit(0)
+ print >>sys.stderr, "SERVER_READY."
def daemonize(self):
# pipes for process synchronization
(r, w) = os.pipe()
+
+ # build root folder
+ root = os.path.normpath(self._root_dir)
+ if not os.path.exists(root):
+ os.makedirs(root, 0755)
pid1 = os.fork()
if pid1 > 0:
os.close(w)
- os.read(r, 1)
+ while True:
+ try:
+ os.read(r, 1)
+ except OSError, e: # pragma: no cover
+ if e.errno == errno.EINTR:
+ continue
+ else:
+ raise
+ break
os.close(r)
# os.waitpid avoids leaving a <defunc> (zombie) process
st = os.waitpid(pid1, 0)[1]
# was opened with 0 buffer
os.dup2(stdout.fileno(), sys.stdout.fileno())
os.dup2(stderr.fileno(), sys.stderr.fileno())
+
+ # setup environment
+ if self._environment_setup:
+ # parse environment variables and pass to child process
+ # do it by executing shell commands, in case there's some heavy setup involved
+ envproc = subprocess.Popen(
+ [ "bash", "-c",
+ "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
+ ( self._environment_setup, ) ],
+ stdin = subprocess.PIPE,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE
+ )
+ out,err = envproc.communicate()
+
+ # parse new environment
+ if out:
+ environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
+
+ # apply to current environment
+ for name, value in environment.iteritems():
+ os.environ[name] = value
+
+ # apply pythonpath
+ if 'PYTHONPATH' in environment:
+ sys.path = environment['PYTHONPATH'].split(':') + sys.path
# create control socket
self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
return 1
def post_daemonize(self):
- pass
+ # QT, for some strange reason, redefines the SIGCHILD handler to write
+ # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
+ # Server dameonization closes all file descriptors from fileno '3',
+ # but the overloaded handler (inherited by the forked process) will
+ # keep trying to write the \0 to fileno 'x', which might have been reused
+ # after closing, for other operations. This is bad bad bad when fileno 'x'
+ # is in use for communication pouroses, because unexpected \0 start
+ # appearing in the communication messages... this is exactly what happens
+ # when using netns in daemonized form. Thus, be have no other alternative than
+ # restoring the SIGCHLD handler to the default here.
+ import signal
+ signal.signal(signal.SIGCHLD, signal.SIG_DFL)
def loop(self):
while not self._stop:
try:
msg = self.recv_msg(conn)
except socket.timeout, e:
+ self.log_error()
break
if msg == STOP_MSG:
self.log_error()
def recv_msg(self, conn):
- data = ""
- while True:
+ data = [self._rdbuf]
+ chunk = data[0]
+ while '\n' not in chunk:
try:
chunk = conn.recv(1024)
- except OSError, e:
- if e.errno != errno.EINTR:
+ except (OSError, socket.error), e:
+ if e[0] != errno.EINTR:
raise
- if chunk == '':
+ else:
continue
if chunk:
- data += chunk
- if chunk[-1] == "\n":
- break
+ data.append(chunk)
else:
# empty chunk = EOF
break
+ data = ''.join(data).split('\n',1)
+ while len(data) < 2:
+ data.append('')
+ data, self._rdbuf = data
+
decoded = base64.b64decode(data)
return decoded.rstrip()
self._ctrl_sock = None
self._root_dir = root_dir
self._stop = False
+ self._rdbuf = ""
def forward(self):
self.connect()
- print >>sys.stderr, "READY."
+ print >>sys.stderr, "FORWARDER_READY."
while not self._stop:
data = self.read_data()
self.send_to_server(data)
def send_to_server(self, data):
try:
self._ctrl_sock.send(data)
- except IOError, e:
- if e.errno == errno.EPIPE:
+ except (IOError, socket.error), e:
+ if e[0] == errno.EPIPE:
self.connect()
self._ctrl_sock.send(data)
else:
self._stop = True
def recv_from_server(self):
- data = ""
- while True:
+ data = [self._rdbuf]
+ chunk = data[0]
+ while '\n' not in chunk:
try:
chunk = self._ctrl_sock.recv(1024)
- except OSError, e:
- if e.errno != errno.EINTR:
+ except (OSError, socket.error), e:
+ if e[0] != errno.EINTR:
raise
- if chunk == '':
- continue
- data += chunk
- if chunk[-1] == "\n":
+ continue
+ if chunk:
+ data.append(chunk)
+ else:
+ # empty chunk = EOF
break
- return data
+ data = ''.join(data).split('\n',1)
+ while len(data) < 2:
+ data.append('')
+ data, self._rdbuf = data
+
+ return data+'\n'
def connect(self):
self.disconnect()
class Client(object):
def __init__(self, root_dir = ".", host = None, port = None, user = None,
- agent = None):
+ agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
+ environment_setup = ""):
self.root_dir = root_dir
self.addr = (host, port)
self.user = user
self.agent = agent
+ self.sudo = sudo
+ self.communication = communication
+ self.environment_setup = environment_setup
self._stopped = False
+ self._deferreds = collections.deque()
self.connect()
def __del__(self):
(host, port) = self.addr
user = self.user
agent = self.agent
+ sudo = self.sudo
+ communication = self.communication
python_code = "from nepi.util import server;c=server.Forwarder(%r);\
c.forward()" % (root_dir,)
- if host != None:
- self._process = popen_ssh_subprocess(python_code, host, port,
- user, agent)
- # popen_ssh_subprocess already waits for readiness
- else:
- self._process = subprocess.Popen(
- ["python", "-c", python_code],
- stdin = subprocess.PIPE,
- stdout = subprocess.PIPE,
- stderr = subprocess.PIPE
- )
-
+
+ self._process = popen_python(python_code,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ sudo = sudo,
+ environment_setup = self.environment_setup)
+
# Wait for the forwarder to be ready, otherwise nobody
# will be able to connect to it
helo = self._process.stderr.readline()
- if helo != 'READY.\n':
- raise AssertionError, "Expected 'Ready.', got %r: %s" % (helo,
+ if helo != 'FORWARDER_READY.\n':
+ raise AssertionError, "Expected 'FORWARDER_READY.', got %r: %s" % (helo,
helo + self._process.stderr.read())
def send_msg(self, msg):
self.send_msg(STOP_MSG)
self._stopped = True
- def read_reply(self):
+ def defer_reply(self, transform=None):
+ defer_entry = []
+ self._deferreds.append(defer_entry)
+ return defer.Defer(
+ functools.partial(self.read_reply, defer_entry, transform)
+ )
+
+ def _read_reply(self):
data = self._process.stdout.readline()
encoded = data.rstrip()
+ if not encoded:
+ # empty == eof == dead process, poll it to un-zombify
+ self._process.poll()
+
+ raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
return base64.b64decode(encoded)
+
+ def read_reply(self, which=None, transform=None):
+ # Test to see if someone did it already
+ if which is not None and len(which):
+ # Ok, they did it...
+ # ...just return the deferred value
+ if transform:
+ return transform(which[0])
+ else:
+ return which[0]
+
+ # Process all deferreds until the one we're looking for
+ # or until the queue is empty
+ while self._deferreds:
+ try:
+ deferred = self._deferreds.popleft()
+ except IndexError:
+ # emptied
+ break
+
+ deferred.append(self._read_reply())
+ if deferred is which:
+ # We reached the one we were looking for
+ if transform:
+ return transform(deferred[0])
+ else:
+ return deferred[0]
+
+ if which is None:
+ # They've requested a synchronous read
+ if transform:
+ return transform(self._read_reply())
+ else:
+ return self._read_reply()
def _make_server_key_args(server_key, host, port, args):
"""
tmp_known_hosts.flush()
args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
+
return tmp_known_hosts
def popen_ssh_command(command, host, port, user, agent,
- stdin="",
- ident_key = None,
- server_key = None,
- tty = False):
- """
- Executes a remote commands, returns ((stdout,stderr),process)
- """
- if TRACE:
- print "ssh", host, command
-
- tmp_known_hosts = None
- args = ['ssh',
- # Don't bother with localhost. Makes test easier
- '-o', 'NoHostAuthenticationForLocalhost=yes',
- '-l', user, host]
- if agent:
- args.append('-A')
- if port:
- args.append('-p%d' % port)
- if ident_key:
- args.extend(('-i', ident_key))
- if tty:
- args.append('-t')
- if server_key:
- # Create a temporary server key file
- tmp_known_hosts = _make_server_key_args(
- server_key, host, port, args)
- args.append(command)
-
+ stdin="",
+ ident_key = None,
+ server_key = None,
+ tty = False,
+ timeout = None,
+ retry = 0,
+ err_on_timeout = True):
+ """
+ Executes a remote commands, returns ((stdout,stderr),process)
+ """
+ if TRACE:
+ print "ssh", host, command
+
+ tmp_known_hosts = None
+ args = ['ssh',
+ # Don't bother with localhost. Makes test easier
+ '-o', 'NoHostAuthenticationForLocalhost=yes',
+ '-l', user, host]
+ if agent:
+ args.append('-A')
+ if port:
+ args.append('-p%d' % port)
+ if ident_key:
+ args.extend(('-i', ident_key))
+ if tty:
+ args.append('-t')
+ if server_key:
+ # Create a temporary server key file
+ tmp_known_hosts = _make_server_key_args(
+ server_key, host, port, args)
+ args.append(command)
+
+ while 1:
# connects to the remote host and starts a remote connection
proc = subprocess.Popen(args,
stdout = subprocess.PIPE,
# alive until the process is finished with it
proc._known_hosts = tmp_known_hosts
- out, err = proc.communicate(stdin)
- if TRACE:
- print " -> ", out, err
+ try:
+ out, err = _communicate(proc, stdin, timeout, err_on_timeout)
+ break
+ except RuntimeError,e:
+ if retry <= 0:
+ raise
+ if TRACE:
+ print " timedout -> ", e.args
+ retry -= 1
+
+ if TRACE:
+ print " -> ", out, err
+
+ return ((out, err), proc)
- return ((out, err), proc)
-
def popen_scp(source, dest,
- port = None,
- agent = None,
- recursive = False,
- ident_key = None,
- server_key = None):
- """
- Copies from/to remote sites.
-
- Source and destination should have the user and host encoded
- as per scp specs.
-
- If source is a file object, a special mode will be used to
- create the remote file with the same contents.
-
- If dest is a file object, the remote file (source) will be
- read and written into dest.
+ port = None,
+ agent = None,
+ recursive = False,
+ ident_key = None,
+ server_key = None):
+ """
+ Copies from/to remote sites.
+
+ Source and destination should have the user and host encoded
+ as per scp specs.
+
+ If source is a file object, a special mode will be used to
+ create the remote file with the same contents.
+
+ If dest is a file object, the remote file (source) will be
+ read and written into dest.
+
+ In these modes, recursive cannot be True.
+
+ Source can be a list of files to copy to a single destination,
+ in which case it is advised that the destination be a folder.
+ """
+
+ if TRACE:
+ print "scp", source, dest
+
+ if isinstance(source, file) and source.tell() == 0:
+ source = source.name
+ elif hasattr(source, 'read'):
+ tmp = tempfile.NamedTemporaryFile()
+ while True:
+ buf = source.read(65536)
+ if buf:
+ tmp.write(buf)
+ else:
+ break
+ tmp.seek(0)
+ source = tmp.name
+
+ if isinstance(source, file) or isinstance(dest, file) \
+ or hasattr(source, 'read') or hasattr(dest, 'write'):
+ assert not recursive
- In these modes, recursive cannot be True.
+ # Parse source/destination as <user>@<server>:<path>
+ if isinstance(dest, basestring) and ':' in dest:
+ remspec, path = dest.split(':',1)
+ elif isinstance(source, basestring) and ':' in source:
+ remspec, path = source.split(':',1)
+ else:
+ raise ValueError, "Both endpoints cannot be local"
+ user,host = remspec.rsplit('@',1)
+ tmp_known_hosts = None
- Source can be a list of files to copy to a single destination,
- in which case it is advised that the destination be a folder.
- """
+ args = ['ssh', '-l', user, '-C',
+ # Don't bother with localhost. Makes test easier
+ '-o', 'NoHostAuthenticationForLocalhost=yes',
+ host ]
+ if port:
+ args.append('-P%d' % port)
+ if ident_key:
+ args.extend(('-i', ident_key))
+ if server_key:
+ # Create a temporary server key file
+ tmp_known_hosts = _make_server_key_args(
+ server_key, host, port, args)
- if TRACE:
- print "scp", source, dest
+ if isinstance(source, file) or hasattr(source, 'read'):
+ args.append('cat > %s' % (shell_escape(path),))
+ elif isinstance(dest, file) or hasattr(dest, 'write'):
+ args.append('cat %s' % (shell_escape(path),))
+ else:
+ raise AssertionError, "Unreachable code reached! :-Q"
- if isinstance(source, file) or isinstance(dest, file) \
- or hasattr(source, 'read') or hasattr(dest, 'write'):
- assert not recursive
-
- # Parse source/destination as <user>@<server>:<path>
- if isinstance(dest, basestring) and ':' in dest:
- remspec, path = dest.split(':',1)
- elif isinstance(source, basestring) and ':' in source:
- remspec, path = source.split(':',1)
- else:
- raise ValueError, "Both endpoints cannot be local"
- user,host = remspec.rsplit('@',1)
- tmp_known_hosts = None
-
- args = ['ssh', '-l', user, '-C',
- # Don't bother with localhost. Makes test easier
- '-o', 'NoHostAuthenticationForLocalhost=yes',
- host ]
- if port:
- args.append('-P%d' % port)
- if ident_key:
- args.extend(('-i', ident_key))
- if server_key:
- # Create a temporary server key file
- tmp_known_hosts = _make_server_key_args(
- server_key, host, port, args)
+ # connects to the remote host and starts a remote connection
+ if isinstance(source, file):
+ proc = subprocess.Popen(args,
+ stdout = open('/dev/null','w'),
+ stderr = subprocess.PIPE,
+ stdin = source)
+ err = proc.stderr.read()
+ proc._known_hosts = tmp_known_hosts
+ eintr_retry(proc.wait)()
+ return ((None,err), proc)
+ elif isinstance(dest, file):
+ proc = subprocess.Popen(args,
+ stdout = open('/dev/null','w'),
+ stderr = subprocess.PIPE,
+ stdin = source)
+ err = proc.stderr.read()
+ proc._known_hosts = tmp_known_hosts
+ eintr_retry(proc.wait)()
+ return ((None,err), proc)
+ elif hasattr(source, 'read'):
+ # file-like (but not file) source
+ proc = subprocess.Popen(args,
+ stdout = open('/dev/null','w'),
+ stderr = subprocess.PIPE,
+ stdin = subprocess.PIPE)
- if isinstance(source, file) or hasattr(source, 'read'):
- args.append('cat > %s' % (shell_escape(path),))
- elif isinstance(dest, file) or hasattr(dest, 'write'):
- args.append('cat %s' % (shell_escape(path),))
- else:
- raise AssertionError, "Unreachable code reached! :-Q"
+ buf = None
+ err = []
+ while True:
+ if not buf:
+ buf = source.read(4096)
+ if not buf:
+ #EOF
+ break
+
+ rdrdy, wrdy, broken = select.select(
+ [proc.stderr],
+ [proc.stdin],
+ [proc.stderr,proc.stdin])
+
+ if proc.stderr in rdrdy:
+ # use os.read for fully unbuffered behavior
+ err.append(os.read(proc.stderr.fileno(), 4096))
+
+ if proc.stdin in wrdy:
+ proc.stdin.write(buf)
+ buf = None
+
+ if broken:
+ break
+ proc.stdin.close()
+ err.append(proc.stderr.read())
+
+ proc._known_hosts = tmp_known_hosts
+ eintr_retry(proc.wait)()
+ return ((None,''.join(err)), proc)
+ elif hasattr(dest, 'write'):
+ # file-like (but not file) dest
+ proc = subprocess.Popen(args,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE,
+ stdin = open('/dev/null','w'))
- # connects to the remote host and starts a remote connection
- if isinstance(source, file):
- proc = subprocess.Popen(args,
- stdout = open('/dev/null','w'),
- stderr = subprocess.PIPE,
- stdin = source)
- err = proc.stderr.read()
- proc._known_hosts = tmp_known_hosts
- proc.wait()
- return ((None,err), proc)
- elif isinstance(dest, file):
- proc = subprocess.Popen(args,
- stdout = open('/dev/null','w'),
- stderr = subprocess.PIPE,
- stdin = source)
- err = proc.stderr.read()
- proc._known_hosts = tmp_known_hosts
- proc.wait()
- return ((None,err), proc)
- elif hasattr(source, 'read'):
- # file-like (but not file) source
- proc = subprocess.Popen(args,
- stdout = open('/dev/null','w'),
- stderr = subprocess.PIPE,
- stdin = subprocess.PIPE)
+ buf = None
+ err = []
+ while True:
+ rdrdy, wrdy, broken = select.select(
+ [proc.stderr, proc.stdout],
+ [],
+ [proc.stderr, proc.stdout])
- buf = None
- err = []
- while True:
- if not buf:
- buf = source.read(4096)
+ if proc.stderr in rdrdy:
+ # use os.read for fully unbuffered behavior
+ err.append(os.read(proc.stderr.fileno(), 4096))
+
+ if proc.stdout in rdrdy:
+ # use os.read for fully unbuffered behavior
+ buf = os.read(proc.stdout.fileno(), 4096)
+ dest.write(buf)
+
if not buf:
#EOF
break
-
- rdrdy, wrdy, broken = select.select(
- [proc.stderr],
- [proc.stdin],
- [proc.stderr,proc.stdin])
-
- if proc.stderr in rdrdy:
- # use os.read for fully unbuffered behavior
- err.append(os.read(proc.stderr.fileno(), 4096))
-
- if proc.stdin in wrdy:
- proc.stdin.write(buf)
- buf = None
-
- if broken:
- break
- proc.stdin.close()
- err.append(proc.stderr.read())
-
- proc._known_hosts = tmp_known_hosts
- proc.wait()
- return ((None,''.join(err)), proc)
- elif hasattr(dest, 'write'):
- # file-like (but not file) dest
- proc = subprocess.Popen(args,
- stdout = subprocess.PIPE,
- stderr = subprocess.PIPE,
- stdin = open('/dev/null','w'))
- buf = None
- err = []
- while True:
- rdrdy, wrdy, broken = select.select(
- [proc.stderr, proc.stdout],
- [],
- [proc.stderr, proc.stdout])
-
- if proc.stderr in rdrdy:
- # use os.read for fully unbuffered behavior
- err.append(os.read(proc.stderr.fileno(), 4096))
-
- if proc.stdout in rdrdy:
- # use os.read for fully unbuffered behavior
- buf = os.read(proc.stdout.fileno(), 4096)
- dest.write(buf)
-
- if not buf:
- #EOF
- break
-
- if broken:
- break
- err.append(proc.stderr.read())
-
- proc._known_hosts = tmp_known_hosts
- proc.wait()
- return ((None,''.join(err)), proc)
- else:
- raise AssertionError, "Unreachable code reached! :-Q"
+ if broken:
+ break
+ err.append(proc.stderr.read())
+
+ proc._known_hosts = tmp_known_hosts
+ eintr_retry(proc.wait)()
+ return ((None,''.join(err)), proc)
else:
- # Parse destination as <user>@<server>:<path>
- if isinstance(dest, basestring) and ':' in dest:
- remspec, path = dest.split(':',1)
- elif isinstance(source, basestring) and ':' in source:
- remspec, path = source.split(':',1)
- else:
- raise ValueError, "Both endpoints cannot be local"
- user,host = remspec.rsplit('@',1)
-
- # plain scp
- tmp_known_hosts = None
- args = ['scp', '-q', '-p', '-C',
- # Don't bother with localhost. Makes test easier
- '-o', 'NoHostAuthenticationForLocalhost=yes' ]
- if port:
- args.append('-P%d' % port)
- if recursive:
- args.append('-r')
- if ident_key:
- args.extend(('-i', ident_key))
- if server_key:
- # Create a temporary server key file
- tmp_known_hosts = _make_server_key_args(
- server_key, host, port, args)
- if isinstance(source,list):
- args.extend(source)
- else:
- args.append(source)
- args.append(dest)
+ raise AssertionError, "Unreachable code reached! :-Q"
+ else:
+ # Parse destination as <user>@<server>:<path>
+ if isinstance(dest, basestring) and ':' in dest:
+ remspec, path = dest.split(':',1)
+ elif isinstance(source, basestring) and ':' in source:
+ remspec, path = source.split(':',1)
+ else:
+ raise ValueError, "Both endpoints cannot be local"
+ user,host = remspec.rsplit('@',1)
+
+ # plain scp
+ tmp_known_hosts = None
+ args = ['scp', '-q', '-p', '-C',
+ # Don't bother with localhost. Makes test easier
+ '-o', 'NoHostAuthenticationForLocalhost=yes' ]
+ if port:
+ args.append('-P%d' % port)
+ if recursive:
+ args.append('-r')
+ if ident_key:
+ args.extend(('-i', ident_key))
+ if server_key:
+ # Create a temporary server key file
+ tmp_known_hosts = _make_server_key_args(
+ server_key, host, port, args)
+ if isinstance(source,list):
+ args.extend(source)
+ else:
+ args.append(source)
+ args.append(dest)
- # connects to the remote host and starts a remote connection
- proc = subprocess.Popen(args,
- stdout = subprocess.PIPE,
- stdin = subprocess.PIPE,
- stderr = subprocess.PIPE)
- proc._known_hosts = tmp_known_hosts
-
- comm = proc.communicate()
- proc.wait()
- return (comm, proc)
-
-def popen_ssh_subprocess(python_code, host, port, user, agent,
+ # connects to the remote host and starts a remote connection
+ proc = subprocess.Popen(args,
+ stdout = subprocess.PIPE,
+ stdin = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+ proc._known_hosts = tmp_known_hosts
+
+ comm = proc.communicate()
+ eintr_retry(proc.wait)()
+ return (comm, proc)
+
+def decode_and_execute():
+ # The python code we want to execute might have characters that
+ # are not compatible with the 'inline' mode we are using. To avoid
+ # problems we receive the encoded python code in base64 as a input
+ # stream and decode it for execution.
+ import base64, os
+ cmd = ""
+ while True:
+ cmd += os.read(0, 1)# one byte from stdin
+ if cmd[-1] == "\n":
+ break
+ cmd = base64.b64decode(cmd)
+ # Uncomment for debug
+ #os.write(2, "Executing python code: %s\n" % cmd)
+ os.write(1, "OK\n") # send a sync message
+ exec(cmd)
+
+def popen_python(python_code,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ agent = False,
python_path = None,
ident_key = None,
server_key = None,
- tty = False):
- if python_path:
- python_path.replace("'", r"'\''")
- cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
- else:
- cmd = ""
- # Uncomment for debug (to run everything under strace)
- # We had to verify if strace works (cannot nest them)
- #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
- #cmd += "$CMD "
- #if self.mode == MODE_SSH:
- # cmd += "strace -f -tt -s 200 -o strace$$.out "
- cmd += "python -c '"
- cmd += "import base64, os\n"
- cmd += "cmd = \"\"\n"
- cmd += "while True:\n"
- cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
- cmd += " if cmd[-1] == \"\\n\": break\n"
- cmd += "cmd = base64.b64decode(cmd)\n"
- # Uncomment for debug
- #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
- cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
- cmd += "exec(cmd)\n'"
-
+ tty = False,
+ sudo = False,
+ environment_setup = ""):
+
+
+ shell = False
+ cmd = ""
+ if python_path:
+ python_path.replace("'", r"'\''")
+ cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
+ cmd += " ; "
+ if environment_setup:
+ cmd += environment_setup
+ cmd += " ; "
+ # Uncomment for debug (to run everything under strace)
+ # We had to verify if strace works (cannot nest them)
+ #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
+ #cmd += "$CMD "
+ #cmd += "strace -f -tt -s 200 -o strace$$.out "
+ cmd += "python -c 'from nepi.util import server; server.decode_and_execute()'"
+
+ if communication == DC.ACCESS_SSH:
tmp_known_hosts = None
args = ['ssh',
# Don't bother with localhost. Makes test easier
tmp_known_hosts = _make_server_key_args(
server_key, host, port, args)
args.append(cmd)
+ else:
+ args = [cmd]
+ shell = True
- # connects to the remote host and starts a remote rpyc connection
- proc = subprocess.Popen(args,
- stdout = subprocess.PIPE,
- stdin = subprocess.PIPE,
- stderr = subprocess.PIPE)
+ # connects to the remote host and starts a remote
+ proc = subprocess.Popen(args,
+ shell = shell,
+ stdout = subprocess.PIPE,
+ stdin = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+
+ if communication == DC.ACCESS_SSH:
proc._known_hosts = tmp_known_hosts
-
- # send the command to execute
- os.write(proc.stdin.fileno(),
- base64.b64encode(python_code) + "\n")
- msg = os.read(proc.stdout.fileno(), 3)
- if msg != "OK\n":
- raise RuntimeError("Failed to start remote python interpreter")
- return proc
-
+
+ # send the command to execute
+ os.write(proc.stdin.fileno(),
+ base64.b64encode(python_code) + "\n")
+ msg = os.read(proc.stdout.fileno(), 3)
+ if msg != "OK\n":
+ raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
+ msg, proc.stdout.read(), proc.stderr.read())
+
+ return proc
+
+# POSIX
+def _communicate(self, input, timeout=None, err_on_timeout=True):
+ read_set = []
+ write_set = []
+ stdout = None # Return
+ stderr = None # Return
+
+ killed = False
+
+ if timeout is not None:
+ timelimit = time.time() + timeout
+ killtime = timelimit + 4
+ bailtime = timelimit + 4
+
+ if self.stdin:
+ # Flush stdio buffer. This might block, if the user has
+ # been writing to .stdin in an uncontrolled fashion.
+ self.stdin.flush()
+ if input:
+ write_set.append(self.stdin)
+ else:
+ self.stdin.close()
+ if self.stdout:
+ read_set.append(self.stdout)
+ stdout = []
+ if self.stderr:
+ read_set.append(self.stderr)
+ stderr = []
+
+ input_offset = 0
+ while read_set or write_set:
+ if timeout is not None:
+ curtime = time.time()
+ if timeout is None or curtime > timelimit:
+ if curtime > bailtime:
+ break
+ elif curtime > killtime:
+ signum = signal.SIGKILL
+ else:
+ signum = signal.SIGTERM
+ # Lets kill it
+ os.kill(self.pid, signum)
+ select_timeout = 0.5
+ else:
+ select_timeout = timelimit - curtime + 0.1
+ else:
+ select_timeout = None
+
+ try:
+ rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
+ except select.error,e:
+ if e[0] != 4:
+ raise
+ else:
+ continue
+
+ if self.stdin in wlist:
+ # When select has indicated that the file is writable,
+ # we can write up to PIPE_BUF bytes without risk
+ # blocking. POSIX defines PIPE_BUF >= 512
+ bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
+ input_offset += bytes_written
+ if input_offset >= len(input):
+ self.stdin.close()
+ write_set.remove(self.stdin)
+
+ if self.stdout in rlist:
+ data = os.read(self.stdout.fileno(), 1024)
+ if data == "":
+ self.stdout.close()
+ read_set.remove(self.stdout)
+ stdout.append(data)
+
+ if self.stderr in rlist:
+ data = os.read(self.stderr.fileno(), 1024)
+ if data == "":
+ self.stderr.close()
+ read_set.remove(self.stderr)
+ stderr.append(data)
+
+ # All data exchanged. Translate lists into strings.
+ if stdout is not None:
+ stdout = ''.join(stdout)
+ if stderr is not None:
+ stderr = ''.join(stderr)
+
+ # Translate newlines, if requested. We cannot let the file
+ # object do the translation: It is based on stdio, which is
+ # impossible to combine with select (unless forcing no
+ # buffering).
+ if self.universal_newlines and hasattr(file, 'newlines'):
+ if stdout:
+ stdout = self._translate_newlines(stdout)
+ if stderr:
+ stderr = self._translate_newlines(stderr)
+
+ if killed and err_on_timeout:
+ errcode = self.poll()
+ raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
+ else:
+ if killed:
+ self.poll()
+ else:
+ self.wait()
+ return (stdout, stderr)
+