421c9b4238426fc85274cbb9e1e8a8fe001b9e13
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 #         Claudio Freire <claudio-daniel.freire@inria.fr>
19
20 ## TODO: This code needs reviewing !!!
21
22 import base64
23 import errno
24 import hashlib
25 import logging
26 import os
27 import os.path
28 import re
29 import select
30 import signal
31 import socket
32 import subprocess
33 import threading
34 import time
35 import tempfile
36
37 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") 
38
39 logger = logging.getLogger("sshfuncs")
40
41 def log(msg, level = logging.DEBUG, out = None, err = None):
42     if out:
43         msg += " - OUT: %s " % out
44     if err:
45         msg += " - ERROR: %s " % err
46     logger.log(level, msg)
47
48 if hasattr(os, "devnull"):
49     DEV_NULL = os.devnull
50 else:
51     DEV_NULL = "/dev/null"
52
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
54
55 class STDOUT: 
56     """
57     Special value that when given to rspawn in stderr causes stderr to 
58     redirect to whatever stdout was redirected to.
59     """
60     pass
61
62 class ProcStatus:
63     """
64     Codes for status of remote spawned process
65     """
66     # Process is still running
67     RUNNING = 1
68
69     # Process is finished
70     FINISHED = 2
71     
72     # Process hasn't started running yet (this should be very rare)
73     NOT_STARTED = 3
74
75 hostbyname_cache = dict()
76 hostbyname_cache_lock = threading.Lock()
77
78 def resolve_hostname(host):
79     ip = None
80
81     if host in ["localhost", "127.0.0.1", "::1"]:
82         p = subprocess.Popen(
83             "ip -o addr list",
84             shell=True,
85             stdout=subprocess.PIPE,
86             stderr=subprocess.PIPE,
87             universal_newlines = True,
88         )
89         stdout, stderr = p.communicate()
90         m = _re_inet.findall(stdout)
91         ip = m[0][1].split("/")[0]
92     else:
93         ip = socket.gethostbyname(host)
94
95     return ip
96
97 def gethostbyname(host):
98     global hostbyname_cache
99     global hostbyname_cache_lock
100     
101     hostbyname = hostbyname_cache.get(host)
102     if not hostbyname:
103         with hostbyname_cache_lock:
104             hostbyname = resolve_hostname(host)
105             hostbyname_cache[host] = hostbyname
106
107             msg = " Added hostbyname %s - %s " % (host, hostbyname)
108             log(msg, logging.DEBUG)
109
110     return hostbyname
111
112 OPENSSH_HAS_PERSIST = None
113
114 def openssh_has_persist():
115     """ The ssh_config options ControlMaster and ControlPersist allow to
116     reuse a same network connection for multiple ssh sessions. In this 
117     way limitations on number of open ssh connections can be bypassed.
118     However, older versions of openSSH do not support this feature.
119     This function is used to determine if ssh connection persist features
120     can be used.
121     """
122     global OPENSSH_HAS_PERSIST
123     if OPENSSH_HAS_PERSIST is None:
124         proc = subprocess.Popen(
125             ["ssh", "-v"],
126             stdout = subprocess.PIPE,
127             stderr = subprocess.STDOUT,
128             stdin = subprocess.DEVNULL,
129             universal_newlines = True,
130         )
131         out,err = proc.communicate()
132         proc.wait()
133         
134         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
135         OPENSSH_HAS_PERSIST = bool(vre.match(out))
136     return OPENSSH_HAS_PERSIST
137
138 def make_server_key_args(server_key, host, port):
139     """ Returns a reference to a temporary known_hosts file, to which 
140     the server key has been added. 
141     
142     Make sure to hold onto the temp file reference until the process is 
143     done with it
144
145     :param server_key: the server public key
146     :type server_key: str
147
148     :param host: the hostname
149     :type host: str
150
151     :param port: the ssh port
152     :type port: str
153
154     """
155     if port is not None:
156         host = '%s:%s' % (host, str(port))
157
158     # Create a temporary server key file
159     tmp_known_hosts = tempfile.NamedTemporaryFile()
160    
161     hostbyname = gethostbyname(host) 
162
163     # Add the intended host key
164     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
165     
166     # If we're not in strict mode, add user-configured keys
167     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
168         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
169         if os.access(user_hosts_path, os.R_OK):
170             with open(user_hosts_path, "r") as f:
171                 tmp_known_hosts.write(f.read())
172         
173     tmp_known_hosts.flush()
174     
175     return tmp_known_hosts
176
177 def make_control_path(agent, forward_x11):
178     ctrl_path = "/tmp/nepi_ssh"
179
180     if agent:
181         ctrl_path +="_a"
182
183     if forward_x11:
184         ctrl_path +="_x"
185
186     ctrl_path += "-%r@%h:%p"
187
188     return ctrl_path
189
190 def shell_escape(s):
191     """ Escapes strings so that they are safe to use as command-line 
192     arguments """
193     if SHELL_SAFE.match(s):
194         # safe string - no escaping needed
195         return s
196     else:
197         # unsafe string - escape
198         def escape(c):
199             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
200                 return c
201             else:
202                 return "'$'\\x%02x''" % (ord(c),)
203         s = ''.join(map(escape, s))
204         return "'%s'" % (s,)
205
206 def eintr_retry(func):
207     """Retries a function invocation when a EINTR occurs"""
208     import functools
209     @functools.wraps(func)
210     def rv(*p, **kw):
211         retry = kw.pop("_retry", False)
212         for i in range(0 if retry else 4):
213             try:
214                 return func(*p, **kw)
215             except (select.error, socket.error) as args:
216                 if args[0] == errno.EINTR:
217                     continue
218                 else:
219                     raise 
220             except OSError as e:
221                 if e.errno == errno.EINTR:
222                     continue
223                 else:
224                     raise
225         else:
226             return func(*p, **kw)
227     return rv
228
229 def rexec(command, host, user, 
230         port = None,
231         gwuser = None,
232         gw = None, 
233         agent = True,
234         sudo = False,
235         identity = None,
236         server_key = None,
237         env = None,
238         tty = False,
239         connect_timeout = 30,
240         retry = 3,
241         persistent = True,
242         forward_x11 = False,
243         blocking = True,
244         strict_host_checking = True):
245     """
246     Executes a remote command, returns ((stdout,stderr),process)
247     """
248
249     tmp_known_hosts = None
250     if not gw:
251         hostip = gethostbyname(host)
252     else: hostip = None
253
254     args = ['ssh', '-C',
255             # Don't bother with localhost. Makes test easier
256             '-o', 'NoHostAuthenticationForLocalhost=yes',
257             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
258             '-o', 'ConnectionAttempts=3',
259             '-o', 'ServerAliveInterval=30',
260             '-o', 'TCPKeepAlive=yes',
261             '-o', 'Batchmode=yes',
262             '-l', user, hostip or host]
263
264     if persistent and openssh_has_persist():
265         args.extend([
266             '-o', 'ControlMaster=auto',
267             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
268             '-o', 'ControlPersist=60' ])
269
270     if not strict_host_checking:
271         # Do not check for Host key. Unsafe.
272         args.extend(['-o', 'StrictHostKeyChecking=no'])
273
274     if gw:
275         proxycommand = _proxy_command(gw, gwuser, identity)
276         args.extend(['-o', proxycommand])
277
278     if agent:
279         args.append('-A')
280
281     if port:
282         args.append('-p%d' % port)
283
284     if identity:
285         identity = os.path.expanduser(identity)
286         args.extend(('-i', identity))
287
288     if tty:
289         args.append('-t')
290         args.append('-t')
291
292     if forward_x11:
293         args.append('-X')
294
295     if server_key:
296         # Create a temporary server key file
297         tmp_known_hosts = make_server_key_args(server_key, host, port)
298         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
299
300     if sudo:
301         command = "sudo " + command
302
303     args.append(command)
304
305     log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) 
306
307     stdout = stderr = stdin = subprocess.PIPE
308     if forward_x11:
309         stdout = stderr = stdin = None
310
311     return _retry_rexec(args, log_msg, 
312                         stderr = stderr,
313                         stdin = stdin,
314                         stdout = stdout,
315                         env = env, 
316                         retry = retry, 
317                         tmp_known_hosts = tmp_known_hosts,
318                         blocking = blocking)
319
320 def rcopy(source, dest,
321         port = None,
322         gwuser = None,
323         gw = None,
324         recursive = False,
325         identity = None,
326         server_key = None,
327         retry = 3,
328         strict_host_checking = True):
329     """
330     Copies from/to remote sites.
331     
332     Source and destination should have the user and host encoded
333     as per scp specs.
334     
335     Source can be a list of files to copy to a single destination, 
336     (in which case it is advised that the destination be a folder),
337     or a single file in a string.
338     """
339
340     # Parse destination as <user>@<server>:<path>
341     if isinstance(dest, str) and ':' in dest:
342         remspec, path = dest.split(':',1)
343     elif isinstance(source, str) and ':' in source:
344         remspec, path = source.split(':',1)
345     else:
346         raise ValueError("Both endpoints cannot be local")
347     user,host = remspec.rsplit('@',1)
348     
349     # plain scp
350     tmp_known_hosts = None
351
352     args = ['scp', '-q', '-p', '-C',
353             # 2015-06-01 Thierry: I am commenting off blowfish
354             # as this is not available on a plain ubuntu 15.04 install
355             # this IMHO is too fragile, shoud be something the user
356             # decides explicitly (so he is at least aware of that dependency)
357             # Speed up transfer using blowfish cypher specification which is 
358             # faster than the default one (3des)
359             # '-c', 'blowfish',
360             # Don't bother with localhost. Makes test easier
361             '-o', 'NoHostAuthenticationForLocalhost=yes',
362             '-o', 'ConnectTimeout=60',
363             '-o', 'ConnectionAttempts=3',
364             '-o', 'ServerAliveInterval=30',
365             '-o', 'TCPKeepAlive=yes' ]
366             
367     if port:
368         args.append('-P%d' % port)
369
370     if gw:
371         proxycommand = _proxy_command(gw, gwuser, identity)
372         args.extend(['-o', proxycommand])
373
374     if recursive:
375         args.append('-r')
376
377     if identity:
378         identity = os.path.expanduser(identity)
379         args.extend(('-i', identity))
380
381     if server_key:
382         # Create a temporary server key file
383         tmp_known_hosts = make_server_key_args(server_key, host, port)
384         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
385
386     if not strict_host_checking:
387         # Do not check for Host key. Unsafe.
388         args.extend(['-o', 'StrictHostKeyChecking=no'])
389     
390     if isinstance(source, list):
391         args.extend(source)
392     else:
393         if openssh_has_persist():
394             args.extend([
395                 '-o', 'ControlMaster=auto',
396                 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
397                 ])
398         args.append(source)
399
400     if isinstance(dest, list):
401         args.extend(dest)
402     else:
403         args.append(dest)
404
405     log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
406     
407     return _retry_rexec(args, log_msg, env = None, retry = retry, 
408             tmp_known_hosts = tmp_known_hosts,
409             blocking = True)
410
411 def rspawn(command, pidfile, 
412            stdout = '/dev/null', 
413            stderr = STDOUT, 
414            stdin = '/dev/null',
415            home = None, 
416            create_home = False, 
417            sudo = False,
418            host = None, 
419            port = None, 
420            user = None, 
421            gwuser = None,
422            gw = None,
423            agent = None, 
424            identity = None, 
425            server_key = None,
426            tty = False,
427            strict_host_checking = True):
428     """
429     Spawn a remote command such that it will continue working asynchronously in 
430     background. 
431
432         :param command: The command to run, it should be a single line.
433         :type command: str
434
435         :param pidfile: Path to a file where to store the pid and ppid of the 
436                         spawned process
437         :type pidfile: str
438
439         :param stdout: Path to file to redirect standard output. 
440                        The default value is /dev/null
441         :type stdout: str
442
443         :param stderr: Path to file to redirect standard error.
444                        If the special STDOUT value is used, stderr will 
445                        be redirected to the same file as stdout
446         :type stderr: str
447
448         :param stdin: Path to a file with input to be piped into the command's standard input
449         :type stdin: str
450
451         :param home: Path to working directory folder. 
452                     It is assumed to exist unless the create_home flag is set.
453         :type home: str
454
455         :param create_home: Flag to force creation of the home folder before 
456                             running the command
457         :type create_home: bool
458  
459         :param sudo: Flag forcing execution with sudo user
460         :type sudo: bool
461         
462         :rtype: tuple
463
464         (stdout, stderr), process
465         
466         Of the spawning process, which only captures errors at spawning time.
467         Usually only useful for diagnostics.
468     """
469     # Start process in a "daemonized" way, using nohup and heavy
470     # stdin/out redirection to avoid connection issues
471     if stderr is STDOUT:
472         stderr = '&1'
473     else:
474         stderr = ' ' + stderr
475     
476     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
477         'command' : command,
478         'pidfile' : shell_escape(pidfile),
479         'stdout' : stdout,
480         'stderr' : stderr,
481         'stdin' : stdin,
482     }
483     
484     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
485             'command' : shell_escape(daemon_command),
486             'sudo' : 'sudo -S' if sudo else '',
487             'pidfile' : shell_escape(pidfile),
488             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
489             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
490         }
491
492     (out,err),proc = rexec(
493         cmd,
494         host = host,
495         port = port,
496         user = user,
497         gwuser = gwuser,
498         gw = gw,
499         agent = agent,
500         identity = identity,
501         server_key = server_key,
502         tty = tty,
503         strict_host_checking = strict_host_checking ,
504         )
505     
506     if proc.wait():
507         raise RuntimeError("Failed to set up application on host %s: %s %s" % (host, out,err,))
508
509     return ((out, err), proc)
510
511 @eintr_retry
512 def rgetpid(pidfile,
513             host = None, 
514             port = None, 
515             user = None, 
516             gwuser = None,
517             gw = None,
518             agent = None, 
519             identity = None,
520             server_key = None,
521             strict_host_checking = True):
522     """
523     Returns the pid and ppid of a process from a remote file where the 
524     information was stored.
525
526         :param home: Path to directory where the pidfile is located
527         :type home: str
528
529         :param pidfile: Name of file containing the pid information
530         :type pidfile: str
531         
532         :rtype: int
533         
534         A (pid, ppid) tuple useful for calling rstatus and rkill,
535         or None if the pidfile isn't valid yet (can happen when process is staring up)
536
537     """
538     (out,err),proc = rexec(
539         "cat %(pidfile)s" % {
540             'pidfile' : pidfile,
541         },
542         host = host,
543         port = port,
544         user = user,
545         gwuser = gwuser,
546         gw = gw,
547         agent = agent,
548         identity = identity,
549         server_key = server_key,
550         strict_host_checking = strict_host_checking
551         )
552         
553     if proc.wait():
554         return None
555     
556     if out:
557         try:
558             return [ int(x) for x in out.strip().split(' ', 1) ]
559         except:
560             # Ignore, many ways to fail that don't matter that much
561             return None
562
563 @eintr_retry
564 def rstatus(pid, ppid, 
565         host = None, 
566         port = None, 
567         user = None, 
568         gwuser = None,
569         gw = None,
570         agent = None, 
571         identity = None,
572         server_key = None,
573         strict_host_checking = True):
574     """
575     Returns a code representing the the status of a remote process
576
577         :param pid: Process id of the process
578         :type pid: int
579
580         :param ppid: Parent process id of process
581         :type ppid: int
582     
583         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
584     
585     """
586     (out,err),proc = rexec(
587         # Check only by pid. pid+ppid does not always work (especially with sudo) 
588         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
589             'ppid' : ppid,
590             'pid' : pid,
591         },
592         host = host,
593         port = port,
594         user = user,
595         gwuser = gwuser,
596         gw = gw,
597         agent = agent,
598         identity = identity,
599         server_key = server_key,
600         strict_host_checking = strict_host_checking
601         )
602     
603     if proc.wait():
604         return ProcStatus.NOT_STARTED
605     
606     status = False
607     if err:
608         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
609             status = True
610     elif out:
611         status = (out.strip() == 'wait')
612     else:
613         return ProcStatus.NOT_STARTED
614     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
615
616 @eintr_retry
617 def rkill(pid, ppid,
618         host = None, 
619         port = None, 
620         user = None, 
621         gwuser = None,
622         gw = None,
623         agent = None, 
624         sudo = False,
625         identity = None, 
626         server_key = None, 
627         nowait = False,
628         strict_host_checking = True):
629     """
630     Sends a kill signal to a remote process.
631
632     First tries a SIGTERM, and if the process does not end in 10 seconds,
633     it sends a SIGKILL.
634  
635         :param pid: Process id of process to be killed
636         :type pid: int
637
638         :param ppid: Parent process id of process to be killed
639         :type ppid: int
640
641         :param sudo: Flag indicating if sudo should be used to kill the process
642         :type sudo: bool
643         
644     """
645     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
646     cmd = """
647 SUBKILL="%(subkill)s" ;
648 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
649 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
650 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
651     sleep 0.2 
652     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
653         break
654     else
655         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
656         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
657     fi
658     sleep 1.8
659 done
660 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
661     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
662     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
663 fi
664 """
665     if nowait:
666         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
667
668     (out,err),proc = rexec(
669         cmd % {
670             'ppid' : ppid,
671             'pid' : pid,
672             'sudo' : 'sudo -S' if sudo else '',
673             'subkill' : subkill,
674         },
675         host = host,
676         port = port,
677         user = user,
678         gwuser = gwuser,
679         gw = gw,
680         agent = agent,
681         identity = identity,
682         server_key = server_key,
683         strict_host_checking = strict_host_checking
684         )
685     
686     # wait, don't leave zombies around
687     proc.wait()
688
689     return (out, err), proc
690
691 def _retry_rexec(args,
692                  log_msg,
693                  stdout = subprocess.PIPE,
694                  stdin = subprocess.PIPE, 
695                  stderr = subprocess.PIPE,
696                  env = None,
697                  retry = 3,
698                  tmp_known_hosts = None,
699                  blocking = True):
700
701     for x in range(retry):
702         # display command actually invoked when debug is turned on
703         message = " ".join( [ "'{}'".format(arg) for arg in args ] )
704         log("sshfuncs: invoking {}".format(message), logging.DEBUG)
705         # connects to the remote host and starts a remote connection
706         proc = subprocess.Popen(
707             args,
708             env = env,
709             stdout = stdout,
710             stdin = stdin, 
711             stderr = stderr,
712             universal_newlines = True,
713         )        
714         # attach tempfile object to the process, to make sure the file stays
715         # alive until the process is finished with it
716         proc._known_hosts = tmp_known_hosts
717     
718         # The argument block == False forces to rexec to return immediately, 
719         # without blocking 
720         try:
721             err = out = " "
722             if blocking:
723                 #(out, err) = proc.communicate()
724                 # The method communicate was re implemented for performance issues
725                 # when using python subprocess communicate method the ssh commands 
726                 # last one minute each
727                 #log("BEFORE communicate", level=logging.INFO); import time; beg=time.time()
728                 out, err = _communicate(proc, input=None)
729                 #log("AFTER communicate - {}s".format(time.time()-beg), level=logging.INFO)
730
731             elif stdout:
732                 out = proc.stdout.read()
733                 if proc.poll() and stderr:
734                     err = proc.stderr.read()
735
736             log(log_msg, logging.DEBUG, out, err)
737
738             if proc.poll():
739                 skip = False
740
741                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
742                     # SSH error, can safely retry
743                     skip = True 
744                 elif retry:
745                     # Probably timed out or plain failed but can retry
746                     skip = True 
747                 
748                 if skip:
749                     t = x*2
750                     msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
751                             t, x, " ".join(args))
752                     log(msg, logging.DEBUG)
753
754                     time.sleep(t)
755                     continue
756             break
757         except RuntimeError as e:
758             msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
759             log(msg, logging.DEBUG, out, err)
760
761             if retry <= 0:
762                 raise
763             retry -= 1
764
765     return ((out, err), proc)
766
767 # POSIX
768 # Don't remove. The method communicate was re implemented for performance issues
769 def _communicate(proc, input, timeout=None, err_on_timeout=True):
770     read_set = []
771     write_set = []
772     stdout = None # Return
773     stderr = None # Return
774
775     killed = False
776
777     if timeout is not None:
778         timelimit = time.time() + timeout
779         killtime = timelimit + 4
780         bailtime = timelimit + 4
781
782     if proc.stdin:
783         # Flush stdio buffer.  This might block, if the user has
784         # been writing to .stdin in an uncontrolled fashion.
785         proc.stdin.flush()
786         if input:
787             write_set.append(proc.stdin)
788         else:
789             proc.stdin.close()
790
791     if proc.stdout:
792         read_set.append(proc.stdout)
793         stdout = []
794
795     if proc.stderr:
796         read_set.append(proc.stderr)
797         stderr = []
798
799     input_offset = 0
800     while read_set or write_set:
801         if timeout is not None:
802             curtime = time.time()
803             if timeout is None or curtime > timelimit:
804                 if curtime > bailtime:
805                     break
806                 elif curtime > killtime:
807                     signum = signal.SIGKILL
808                 else:
809                     signum = signal.SIGTERM
810                 # Lets kill it
811                 os.kill(proc.pid, signum)
812                 select_timeout = 0.5
813             else:
814                 select_timeout = timelimit - curtime + 0.1
815         else:
816             select_timeout = 1.0
817
818         if select_timeout > 1.0:
819             select_timeout = 1.0
820
821         try:
822             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
823         except select.error as e:
824             if e[0] != 4:
825                 raise
826             else:
827                 continue
828
829         if not rlist and not wlist and not xlist and proc.poll() is not None:
830             # timeout and process exited, say bye
831             break
832
833         if proc.stdin in wlist:
834             # When select has indicated that the file is writable,
835             # we can write up to PIPE_BUF bytes without risk
836             # blocking.  POSIX defines PIPE_BUF >= 512
837             bytes_written = os.write(proc.stdin.fileno(),
838                     buffer(input, input_offset, 512))
839             input_offset += bytes_written
840
841             if input_offset >= len(input):
842                 proc.stdin.close()
843                 write_set.remove(proc.stdin)
844
845         if proc.stdout in rlist:
846             # python2 version used to do this
847             # data = os.read(proc.stdout.fileno(), 1024)
848             # however this always returned bytes...
849             data = proc.stdout.read()
850             log('we have read {}'.format(data))
851             # data should be str and not bytes because we use
852             # universal_lines = True, but to be clean
853             # instead of saying data != "" 
854             if not data:
855                 log('closing stdout')
856                 proc.stdout.close()
857                 read_set.remove(proc.stdout)
858             stdout.append(data)
859
860         if proc.stderr in rlist:
861             # likewise (see above)
862             # data = os.read(proc.stderr.fileno(), 1024)
863             data = proc.stderr.read()
864             if not data:
865                 proc.stderr.close()
866                 read_set.remove(proc.stderr)
867             stderr.append(data)
868
869     # All data exchanged.  Translate lists into strings.
870     if stdout is not None:
871         stdout = ''.join(stdout)
872     if stderr is not None:
873         stderr = ''.join(stderr)
874
875 #    # Translate newlines, if requested.  We cannot let the file
876 #    # object do the translation: It is based on stdio, which is
877 #    # impossible to combine with select (unless forcing no
878 #    # buffering).
879 #    if proc.universal_newlines and hasattr(file, 'newlines'):
880 #        if stdout:
881 #            stdout = proc._translate_newlines(stdout)
882 #        if stderr:
883 #            stderr = proc._translate_newlines(stderr)
884
885     if killed and err_on_timeout:
886         errcode = proc.poll()
887         raise RuntimeError("Operation timed out", errcode, stdout, stderr)
888     else:
889         if killed:
890             proc.poll()
891         else:
892             proc.wait()
893         return (stdout, stderr)
894
895 def _proxy_command(gw, gwuser, gwidentity):
896     """
897     Constructs the SSH ProxyCommand option to add to the SSH command to connect
898     via a proxy
899         :param gw: SSH proxy hostname
900         :type gw:  str 
901        
902         :param gwuser: SSH proxy username
903         :type gwuser:  str
904
905         :param gwidentity: SSH proxy identity file 
906         :type gwidentity:  str
907
908   
909         :rtype: str 
910         
911         returns the SSH ProxyCommand option.
912     """
913
914     proxycommand = 'ProxyCommand=ssh -q '
915     if gwidentity:
916         proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
917     if gwuser:
918         proxycommand += '%s' % gwuser
919     else:
920         proxycommand += '%r'
921     proxycommand += '@%s -W %%h:%%p' % gw
922
923     return proxycommand
924