# Copyright (C) 2013 INRIA
#
# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation;
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+# Claudio Freire <claudio-daniel.freire@inria.fr>
+
+## TODO: This code needs reviewing !!!
import base64
import errno
import time
import tempfile
+_re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*")
+
logger = logging.getLogger("sshfuncs")
-def log(msg, level, out = None, err = None):
+def log(msg, level = logging.DEBUG, out = None, err = None):
if out:
msg += " - OUT: %s " % out
-
if err:
msg += " - ERROR: %s " % err
-
logger.log(level, msg)
-
if hasattr(os, "devnull"):
DEV_NULL = os.devnull
else:
Special value that when given to rspawn in stderr causes stderr to
redirect to whatever stdout was redirected to.
"""
+ pass
class ProcStatus:
"""
hostbyname_cache = dict()
hostbyname_cache_lock = threading.Lock()
+def resolve_hostname(host):
+ ip = None
+
+ if host in ["localhost", "127.0.0.1", "::1"]:
+ p = subprocess.Popen(
+ "ip -o addr list",
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines = True,
+ )
+ stdout, stderr = p.communicate()
+ m = _re_inet.findall(stdout)
+ ip = m[0][1].split("/")[0]
+ else:
+ ip = socket.gethostbyname(host)
+
+ return ip
+
def gethostbyname(host):
global hostbyname_cache
global hostbyname_cache_lock
hostbyname = hostbyname_cache.get(host)
if not hostbyname:
with hostbyname_cache_lock:
- hostbyname = socket.gethostbyname(host)
+ hostbyname = resolve_hostname(host)
hostbyname_cache[host] = hostbyname
msg = " Added hostbyname %s - %s " % (host, hostbyname)
"""
global OPENSSH_HAS_PERSIST
if OPENSSH_HAS_PERSIST is None:
- proc = subprocess.Popen(["ssh","-v"],
+ proc = subprocess.Popen(
+ ["ssh", "-v"],
stdout = subprocess.PIPE,
stderr = subprocess.STDOUT,
- stdin = open("/dev/null","r") )
+ stdin = subprocess.DEVNULL,
+ universal_newlines = True,
+ )
out,err = proc.communicate()
proc.wait()
return s
else:
# unsafe string - escape
- def escp(c):
+ def escape(c):
if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
return c
else:
return "'$'\\x%02x''" % (ord(c),)
- s = ''.join(map(escp,s))
+ s = ''.join(map(escape, s))
return "'%s'" % (s,)
def eintr_retry(func):
@functools.wraps(func)
def rv(*p, **kw):
retry = kw.pop("_retry", False)
- for i in xrange(0 if retry else 4):
+ for i in range(0 if retry else 4):
try:
return func(*p, **kw)
- except (select.error, socket.error), args:
+ except (select.error, socket.error) as args:
if args[0] == errno.EINTR:
continue
else:
raise
- except OSError, e:
+ except OSError as e:
if e.errno == errno.EINTR:
continue
else:
return rv
def rexec(command, host, user,
- port = None,
+ port = None,
+ gwuser = None,
+ gw = None,
agent = True,
sudo = False,
- stdin = None,
identity = None,
server_key = None,
env = None,
tty = False,
- timeout = None,
- retry = 3,
- err_on_timeout = True,
connect_timeout = 30,
+ retry = 3,
persistent = True,
forward_x11 = False,
+ blocking = True,
strict_host_checking = True):
"""
Executes a remote command, returns ((stdout,stderr),process)
"""
-
+
tmp_known_hosts = None
- hostip = gethostbyname(host)
+ if not gw:
+ hostip = gethostbyname(host)
+ else: hostip = None
args = ['ssh', '-C',
# Don't bother with localhost. Makes test easier
'-o', 'ConnectionAttempts=3',
'-o', 'ServerAliveInterval=30',
'-o', 'TCPKeepAlive=yes',
+ '-o', 'Batchmode=yes',
'-l', user, hostip or host]
if persistent and openssh_has_persist():
# Do not check for Host key. Unsafe.
args.extend(['-o', 'StrictHostKeyChecking=no'])
+ if gw:
+ proxycommand = _proxy_command(gw, gwuser, identity)
+ args.extend(['-o', proxycommand])
+
if agent:
args.append('-A')
args.append('-p%d' % port)
if identity:
+ identity = os.path.expanduser(identity)
args.extend(('-i', identity))
if tty:
tmp_known_hosts = make_server_key_args(server_key, host, port)
args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
- args.append(command)
+ if sudo:
+ command = "sudo " + command
- for x in xrange(retry):
- # connects to the remote host and starts a remote connection
- proc = subprocess.Popen(args,
- env = env,
- stdout = subprocess.PIPE,
- stdin = subprocess.PIPE,
- stderr = subprocess.PIPE)
-
- # attach tempfile object to the process, to make sure the file stays
- # alive until the process is finished with it
- proc._known_hosts = tmp_known_hosts
-
- try:
- out, err = _communicate(proc, stdin, timeout, err_on_timeout)
- msg = " rexec - host %s - command %s " % (host, " ".join(args))
- log(msg, logging.DEBUG, out, err)
+ args.append(command)
- if proc.poll():
- skip = False
+ log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args)))
- if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
- # SSH error, can safely retry
- skip = True
- elif retry:
- # Probably timed out or plain failed but can retry
- skip = True
-
- if skip:
- t = x*2
- msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
- t, x, host, " ".join(args))
- log(msg, logging.DEBUG)
-
- time.sleep(t)
- continue
- break
- except RuntimeError, e:
- msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
- log(msg, logging.DEBUG, out, err)
+ stdout = stderr = stdin = subprocess.PIPE
+ if forward_x11:
+ stdout = stderr = stdin = None
- if retry <= 0:
- raise
- retry -= 1
-
- return ((out, err), proc)
+ return _retry_rexec(args, log_msg,
+ stderr = stderr,
+ stdin = stdin,
+ stdout = stdout,
+ env = env,
+ retry = retry,
+ tmp_known_hosts = tmp_known_hosts,
+ blocking = blocking)
def rcopy(source, dest,
- port = None,
- agent = True,
+ port = None,
+ gwuser = None,
+ gw = None,
recursive = False,
identity = None,
server_key = None,
Source and destination should have the user and host encoded
as per scp specs.
- If source is a file object, a special mode will be used to
- create the remote file with the same contents.
-
- If dest is a file object, the remote file (source) will be
- read and written into dest.
-
- In these modes, recursive cannot be True.
-
- Source can be a list of files to copy to a single destination,
- in which case it is advised that the destination be a folder.
+ Source can be a list of files to copy to a single destination,
+ (in which case it is advised that the destination be a folder),
+ or a single file in a string.
"""
-
- if isinstance(source, file) and source.tell() == 0:
- source = source.name
- elif hasattr(source, 'read'):
- tmp = tempfile.NamedTemporaryFile()
- while True:
- buf = source.read(65536)
- if buf:
- tmp.write(buf)
- else:
- break
- tmp.seek(0)
- source = tmp.name
-
- if isinstance(source, file) or isinstance(dest, file) \
- or hasattr(source, 'read') or hasattr(dest, 'write'):
- assert not recursive
-
- # Parse source/destination as <user>@<server>:<path>
- if isinstance(dest, basestring) and ':' in dest:
- remspec, path = dest.split(':',1)
- elif isinstance(source, basestring) and ':' in source:
- remspec, path = source.split(':',1)
- else:
- raise ValueError, "Both endpoints cannot be local"
- user,host = remspec.rsplit('@',1)
-
- tmp_known_hosts = None
- hostip = gethostbyname(host)
-
- args = ['ssh', '-l', user, '-C',
- # Don't bother with localhost. Makes test easier
- '-o', 'NoHostAuthenticationForLocalhost=yes',
- '-o', 'ConnectTimeout=60',
- '-o', 'ConnectionAttempts=3',
- '-o', 'ServerAliveInterval=30',
- '-o', 'TCPKeepAlive=yes',
- hostip or host ]
- if openssh_has_persist():
- args.extend([
- '-o', 'ControlMaster=auto',
- '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
- '-o', 'ControlPersist=60' ])
-
- if port:
- args.append('-P%d' % port)
-
- if identity:
- args.extend(('-i', identity))
+ # Parse destination as <user>@<server>:<path>
+ if isinstance(dest, str) and ':' in dest:
+ remspec, path = dest.split(':',1)
+ elif isinstance(source, str) and ':' in source:
+ remspec, path = source.split(':',1)
+ else:
+ raise ValueError("Both endpoints cannot be local")
+ user,host = remspec.rsplit('@',1)
+
+ # plain scp
+ tmp_known_hosts = None
- if server_key:
- # Create a temporary server key file
- tmp_known_hosts = make_server_key_args(server_key, host, port)
- args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
-
- if isinstance(source, file) or hasattr(source, 'read'):
- args.append('cat > %s' % (shell_escape(path),))
- elif isinstance(dest, file) or hasattr(dest, 'write'):
- args.append('cat %s' % (shell_escape(path),))
- else:
- raise AssertionError, "Unreachable code reached! :-Q"
-
- # connects to the remote host and starts a remote connection
- if isinstance(source, file):
- proc = subprocess.Popen(args,
- stdout = open('/dev/null','w'),
- stderr = subprocess.PIPE,
- stdin = source)
- err = proc.stderr.read()
- proc._known_hosts = tmp_known_hosts
- eintr_retry(proc.wait)()
- return ((None,err), proc)
- elif isinstance(dest, file):
- proc = subprocess.Popen(args,
- stdout = open('/dev/null','w'),
- stderr = subprocess.PIPE,
- stdin = source)
- err = proc.stderr.read()
- proc._known_hosts = tmp_known_hosts
- eintr_retry(proc.wait)()
- return ((None,err), proc)
- elif hasattr(source, 'read'):
- # file-like (but not file) source
- proc = subprocess.Popen(args,
- stdout = open('/dev/null','w'),
- stderr = subprocess.PIPE,
- stdin = subprocess.PIPE)
-
- buf = None
- err = []
- while True:
- if not buf:
- buf = source.read(4096)
- if not buf:
- #EOF
- break
-
- rdrdy, wrdy, broken = select.select(
- [proc.stderr],
- [proc.stdin],
- [proc.stderr,proc.stdin])
-
- if proc.stderr in rdrdy:
- # use os.read for fully unbuffered behavior
- err.append(os.read(proc.stderr.fileno(), 4096))
-
- if proc.stdin in wrdy:
- proc.stdin.write(buf)
- buf = None
-
- if broken:
- break
- proc.stdin.close()
- err.append(proc.stderr.read())
-
- proc._known_hosts = tmp_known_hosts
- eintr_retry(proc.wait)()
- return ((None,''.join(err)), proc)
- elif hasattr(dest, 'write'):
- # file-like (but not file) dest
- proc = subprocess.Popen(args,
- stdout = subprocess.PIPE,
- stderr = subprocess.PIPE,
- stdin = open('/dev/null','w'))
+ args = ['scp', '-q', '-p', '-C',
+ # 2015-06-01 Thierry: I am commenting off blowfish
+ # as this is not available on a plain ubuntu 15.04 install
+ # this IMHO is too fragile, shoud be something the user
+ # decides explicitly (so he is at least aware of that dependency)
+ # Speed up transfer using blowfish cypher specification which is
+ # faster than the default one (3des)
+ # '-c', 'blowfish',
+ # Don't bother with localhost. Makes test easier
+ '-o', 'NoHostAuthenticationForLocalhost=yes',
+ '-o', 'ConnectTimeout=60',
+ '-o', 'ConnectionAttempts=3',
+ '-o', 'ServerAliveInterval=30',
+ '-o', 'TCPKeepAlive=yes' ]
- buf = None
- err = []
- while True:
- rdrdy, wrdy, broken = select.select(
- [proc.stderr, proc.stdout],
- [],
- [proc.stderr, proc.stdout])
-
- if proc.stderr in rdrdy:
- # use os.read for fully unbuffered behavior
- err.append(os.read(proc.stderr.fileno(), 4096))
-
- if proc.stdout in rdrdy:
- # use os.read for fully unbuffered behavior
- buf = os.read(proc.stdout.fileno(), 4096)
- dest.write(buf)
-
- if not buf:
- #EOF
- break
-
- if broken:
- break
- err.append(proc.stderr.read())
-
- proc._known_hosts = tmp_known_hosts
- eintr_retry(proc.wait)()
- return ((None,''.join(err)), proc)
- else:
- raise AssertionError, "Unreachable code reached! :-Q"
- else:
- # Parse destination as <user>@<server>:<path>
- if isinstance(dest, basestring) and ':' in dest:
- remspec, path = dest.split(':',1)
- elif isinstance(source, basestring) and ':' in source:
- remspec, path = source.split(':',1)
- else:
- raise ValueError, "Both endpoints cannot be local"
- user,host = remspec.rsplit('@',1)
-
- # plain scp
- tmp_known_hosts = None
-
- args = ['scp', '-q', '-p', '-C',
- # Speed up transfer using blowfish cypher specification which is
- # faster than the default one (3des)
- '-c', 'blowfish',
- # Don't bother with localhost. Makes test easier
- '-o', 'NoHostAuthenticationForLocalhost=yes',
- '-o', 'ConnectTimeout=60',
- '-o', 'ConnectionAttempts=3',
- '-o', 'ServerAliveInterval=30',
- '-o', 'TCPKeepAlive=yes' ]
-
- if port:
- args.append('-P%d' % port)
+ if port:
+ args.append('-P%d' % port)
- if recursive:
- args.append('-r')
+ if gw:
+ proxycommand = _proxy_command(gw, gwuser, identity)
+ args.extend(['-o', proxycommand])
- if identity:
- args.extend(('-i', identity))
+ if recursive:
+ args.append('-r')
- if server_key:
- # Create a temporary server key file
- tmp_known_hosts = make_server_key_args(server_key, host, port)
- args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
+ if identity:
+ identity = os.path.expanduser(identity)
+ args.extend(('-i', identity))
- if not strict_host_checking:
- # Do not check for Host key. Unsafe.
- args.extend(['-o', 'StrictHostKeyChecking=no'])
+ if server_key:
+ # Create a temporary server key file
+ tmp_known_hosts = make_server_key_args(server_key, host, port)
+ args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
- if isinstance(source,list):
- args.extend(source)
- else:
- if openssh_has_persist():
- args.extend([
- '-o', 'ControlMaster=auto',
- '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
- ])
- args.append(source)
+ if not strict_host_checking:
+ # Do not check for Host key. Unsafe.
+ args.extend(['-o', 'StrictHostKeyChecking=no'])
+
+ if isinstance(source, list):
+ args.extend(source)
+ else:
+ if openssh_has_persist():
+ args.extend([
+ '-o', 'ControlMaster=auto',
+ '-o', 'ControlPath=%s' % (make_control_path(False, False),)
+ ])
+ args.append(source)
+ if isinstance(dest, list):
+ args.extend(dest)
+ else:
args.append(dest)
- for x in xrange(retry):
- # connects to the remote host and starts a remote connection
- proc = subprocess.Popen(args,
- stdout = subprocess.PIPE,
- stdin = subprocess.PIPE,
- stderr = subprocess.PIPE)
-
- # attach tempfile object to the process, to make sure the file stays
- # alive until the process is finished with it
- proc._known_hosts = tmp_known_hosts
-
- try:
- (out, err) = proc.communicate()
- eintr_retry(proc.wait)()
- msg = " rcopy - host %s - command %s " % (host, " ".join(args))
- log(msg, logging.DEBUG, out, err)
-
- if proc.poll():
- t = x*2
- msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % (
- t, x, host, " ".join(args))
- log(msg, logging.DEBUG)
-
- time.sleep(t)
- continue
-
- break
- except RuntimeError, e:
- msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
- log(msg, logging.DEBUG, out, err)
-
- if retry <= 0:
- raise
- retry -= 1
-
- return ((out, err), proc)
+ log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
+
+ return _retry_rexec(args, log_msg, env = None, retry = retry,
+ tmp_known_hosts = tmp_known_hosts,
+ blocking = True)
def rspawn(command, pidfile,
- stdout = '/dev/null',
- stderr = STDOUT,
- stdin = '/dev/null',
- home = None,
- create_home = False,
- sudo = False,
- host = None,
- port = None,
- user = None,
- agent = None,
- identity = None,
- server_key = None,
- tty = False):
+ stdout = '/dev/null',
+ stderr = STDOUT,
+ stdin = '/dev/null',
+ home = None,
+ create_home = False,
+ sudo = False,
+ host = None,
+ port = None,
+ user = None,
+ gwuser = None,
+ gw = None,
+ agent = None,
+ identity = None,
+ server_key = None,
+ tty = False,
+ strict_host_checking = True):
"""
Spawn a remote command such that it will continue working asynchronously in
background.
:param sudo: Flag forcing execution with sudo user
:type sudo: bool
- :rtype: touple
+ :rtype: tuple
(stdout, stderr), process
else:
stderr = ' ' + stderr
- daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
+ daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
'command' : command,
'pidfile' : shell_escape(pidfile),
'stdout' : stdout,
host = host,
port = port,
user = user,
+ gwuser = gwuser,
+ gw = gw,
agent = agent,
identity = identity,
server_key = server_key,
- tty = tty ,
+ tty = tty,
+ strict_host_checking = strict_host_checking ,
)
if proc.wait():
- raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
+ raise RuntimeError("Failed to set up application on host %s: %s %s" % (host, out,err,))
return ((out, err), proc)
@eintr_retry
def rgetpid(pidfile,
- host = None,
- port = None,
- user = None,
- agent = None,
- identity = None,
- server_key = None):
+ host = None,
+ port = None,
+ user = None,
+ gwuser = None,
+ gw = None,
+ agent = None,
+ identity = None,
+ server_key = None,
+ strict_host_checking = True):
"""
Returns the pid and ppid of a process from a remote file where the
information was stored.
host = host,
port = port,
user = user,
+ gwuser = gwuser,
+ gw = gw,
agent = agent,
identity = identity,
- server_key = server_key
+ server_key = server_key,
+ strict_host_checking = strict_host_checking
)
if proc.wait():
if out:
try:
- return map(int,out.strip().split(' ',1))
+ return [ int(x) for x in out.strip().split(' ',1)) ]
except:
# Ignore, many ways to fail that don't matter that much
return None
host = None,
port = None,
user = None,
+ gwuser = None,
+ gw = None,
agent = None,
identity = None,
- server_key = None):
+ server_key = None,
+ strict_host_checking = True):
"""
Returns a code representing the the status of a remote process
host = host,
port = port,
user = user,
+ gwuser = gwuser,
+ gw = gw,
agent = agent,
identity = identity,
- server_key = server_key
+ server_key = server_key,
+ strict_host_checking = strict_host_checking
)
if proc.wait():
host = None,
port = None,
user = None,
+ gwuser = None,
+ gw = None,
agent = None,
sudo = False,
identity = None,
server_key = None,
- nowait = False):
+ nowait = False,
+ strict_host_checking = True):
"""
Sends a kill signal to a remote process.
host = host,
port = port,
user = user,
+ gwuser = gwuser,
+ gw = gw,
agent = agent,
identity = identity,
- server_key = server_key
+ server_key = server_key,
+ strict_host_checking = strict_host_checking
)
# wait, don't leave zombies around
return (out, err), proc
+def _retry_rexec(args,
+ log_msg,
+ stdout = subprocess.PIPE,
+ stdin = subprocess.PIPE,
+ stderr = subprocess.PIPE,
+ env = None,
+ retry = 3,
+ tmp_known_hosts = None,
+ blocking = True):
+
+ for x in range(retry):
+ # display command actually invoked when debug is turned on
+ message = " ".join( [ "'{}'".format(arg) for arg in args ] )
+ log("sshfuncs: invoking {}".format(message), logging.DEBUG)
+ # connects to the remote host and starts a remote connection
+ proc = subprocess.Popen(
+ args,
+ env = env,
+ stdout = stdout,
+ stdin = stdin,
+ stderr = stderr,
+ universal_newlines = True,
+ )
+ # attach tempfile object to the process, to make sure the file stays
+ # alive until the process is finished with it
+ proc._known_hosts = tmp_known_hosts
+
+ # The argument block == False forces to rexec to return immediately,
+ # without blocking
+ try:
+ err = out = " "
+ if blocking:
+ #(out, err) = proc.communicate()
+ # The method communicate was re implemented for performance issues
+ # when using python subprocess communicate method the ssh commands
+ # last one minute each
+ #log("BEFORE communicate", level=logging.INFO); import time; beg=time.time()
+ out, err = _communicate(proc, input=None)
+ #log("AFTER communicate - {}s".format(time.time()-beg), level=logging.INFO)
+
+ elif stdout:
+ out = proc.stdout.read()
+ if proc.poll() and stderr:
+ err = proc.stderr.read()
+
+ log(log_msg, logging.DEBUG, out, err)
+
+ if proc.poll():
+ skip = False
+
+ if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
+ # SSH error, can safely retry
+ skip = True
+ elif retry:
+ # Probably timed out or plain failed but can retry
+ skip = True
+
+ if skip:
+ t = x*2
+ msg = "SLEEPING %d ... ATEMPT %d - command %s " % (
+ t, x, " ".join(args))
+ log(msg, logging.DEBUG)
+
+ time.sleep(t)
+ continue
+ break
+ except RuntimeError as e:
+ msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
+ log(msg, logging.DEBUG, out, err)
+
+ if retry <= 0:
+ raise
+ retry -= 1
+
+ return ((out, err), proc)
+
# POSIX
-def _communicate(self, input, timeout=None, err_on_timeout=True):
+# Don't remove. The method communicate was re implemented for performance issues
+def _communicate(proc, input, timeout=None, err_on_timeout=True):
read_set = []
write_set = []
stdout = None # Return
stderr = None # Return
-
+
killed = False
-
+
if timeout is not None:
timelimit = time.time() + timeout
killtime = timelimit + 4
bailtime = timelimit + 4
- if self.stdin:
+ if proc.stdin:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
- self.stdin.flush()
+ proc.stdin.flush()
if input:
- write_set.append(self.stdin)
+ write_set.append(proc.stdin)
else:
- self.stdin.close()
- if self.stdout:
- read_set.append(self.stdout)
+ proc.stdin.close()
+
+ if proc.stdout:
+ read_set.append(proc.stdout)
stdout = []
- if self.stderr:
- read_set.append(self.stderr)
+
+ if proc.stderr:
+ read_set.append(proc.stderr)
stderr = []
input_offset = 0
else:
signum = signal.SIGTERM
# Lets kill it
- os.kill(self.pid, signum)
+ os.kill(proc.pid, signum)
select_timeout = 0.5
else:
select_timeout = timelimit - curtime + 0.1
else:
select_timeout = 1.0
-
+
if select_timeout > 1.0:
select_timeout = 1.0
-
+
try:
rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
- except select.error,e:
+ except select.error as e:
if e[0] != 4:
raise
else:
continue
-
- if not rlist and not wlist and not xlist and self.poll() is not None:
+
+ if not rlist and not wlist and not xlist and proc.poll() is not None:
# timeout and process exited, say bye
break
- if self.stdin in wlist:
+ if proc.stdin in wlist:
# When select has indicated that the file is writable,
# we can write up to PIPE_BUF bytes without risk
# blocking. POSIX defines PIPE_BUF >= 512
- bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
+ bytes_written = os.write(proc.stdin.fileno(),
+ buffer(input, input_offset, 512))
input_offset += bytes_written
+
if input_offset >= len(input):
- self.stdin.close()
- write_set.remove(self.stdin)
-
- if self.stdout in rlist:
- data = os.read(self.stdout.fileno(), 1024)
- if data == "":
- self.stdout.close()
- read_set.remove(self.stdout)
+ proc.stdin.close()
+ write_set.remove(proc.stdin)
+
+ if proc.stdout in rlist:
+ # python2 version used to do this
+ # data = os.read(proc.stdout.fileno(), 1024)
+ # however this always returned bytes...
+ data = proc.stdout.read()
+ log('we have read {}'.format(data))
+ # data should be str and not bytes because we use
+ # universal_lines = True, but to be clean
+ # instead of saying data != ""
+ if not data:
+ log('closing stdout')
+ proc.stdout.close()
+ read_set.remove(proc.stdout)
stdout.append(data)
- if self.stderr in rlist:
- data = os.read(self.stderr.fileno(), 1024)
- if data == "":
- self.stderr.close()
- read_set.remove(self.stderr)
+ if proc.stderr in rlist:
+ # likewise (see above)
+ # data = os.read(proc.stderr.fileno(), 1024)
+ data = proc.stderr.read()
+ if not data:
+ proc.stderr.close()
+ read_set.remove(proc.stderr)
stderr.append(data)
-
+
# All data exchanged. Translate lists into strings.
if stdout is not None:
stdout = ''.join(stdout)
if stderr is not None:
stderr = ''.join(stderr)
- # Translate newlines, if requested. We cannot let the file
- # object do the translation: It is based on stdio, which is
- # impossible to combine with select (unless forcing no
- # buffering).
- if self.universal_newlines and hasattr(file, 'newlines'):
- if stdout:
- stdout = self._translate_newlines(stdout)
- if stderr:
- stderr = self._translate_newlines(stderr)
+# # Translate newlines, if requested. We cannot let the file
+# # object do the translation: It is based on stdio, which is
+# # impossible to combine with select (unless forcing no
+# # buffering).
+# if proc.universal_newlines and hasattr(file, 'newlines'):
+# if stdout:
+# stdout = proc._translate_newlines(stdout)
+# if stderr:
+# stderr = proc._translate_newlines(stderr)
if killed and err_on_timeout:
- errcode = self.poll()
- raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
+ errcode = proc.poll()
+ raise RuntimeError("Operation timed out", errcode, stdout, stderr)
else:
if killed:
- self.poll()
+ proc.poll()
else:
- self.wait()
+ proc.wait()
return (stdout, stderr)
+def _proxy_command(gw, gwuser, gwidentity):
+ """
+ Constructs the SSH ProxyCommand option to add to the SSH command to connect
+ via a proxy
+ :param gw: SSH proxy hostname
+ :type gw: str
+
+ :param gwuser: SSH proxy username
+ :type gwuser: str
+
+ :param gwidentity: SSH proxy identity file
+ :type gwidentity: str
+
+
+ :rtype: str
+
+ returns the SSH ProxyCommand option.
+ """
+
+ proxycommand = 'ProxyCommand=ssh -q '
+ if gwidentity:
+ proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
+ if gwuser:
+ proxycommand += '%s' % gwuser
+ else:
+ proxycommand += '%r'
+ proxycommand += '@%s -W %%h:%%p' % gw
+
+ return proxycommand
+