blind review on list-operations added by 2to3:
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 #         Claudio Freire <claudio-daniel.freire@inria.fr>
19
20 ## TODO: This code needs reviewing !!!
21
22 import base64
23 import errno
24 import hashlib
25 import logging
26 import os
27 import os.path
28 import re
29 import select
30 import signal
31 import socket
32 import subprocess
33 import threading
34 import time
35 import tempfile
36
37 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") 
38
39 logger = logging.getLogger("sshfuncs")
40
41 def log(msg, level = logging.DEBUG, out = None, err = None):
42     if out:
43         msg += " - OUT: %s " % out
44     if err:
45         msg += " - ERROR: %s " % err
46     logger.log(level, msg)
47
48 if hasattr(os, "devnull"):
49     DEV_NULL = os.devnull
50 else:
51     DEV_NULL = "/dev/null"
52
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
54
55 class STDOUT: 
56     """
57     Special value that when given to rspawn in stderr causes stderr to 
58     redirect to whatever stdout was redirected to.
59     """
60     pass
61
62 class ProcStatus:
63     """
64     Codes for status of remote spawned process
65     """
66     # Process is still running
67     RUNNING = 1
68
69     # Process is finished
70     FINISHED = 2
71     
72     # Process hasn't started running yet (this should be very rare)
73     NOT_STARTED = 3
74
75 hostbyname_cache = dict()
76 hostbyname_cache_lock = threading.Lock()
77
78 def resolve_hostname(host):
79     ip = None
80
81     if host in ["localhost", "127.0.0.1", "::1"]:
82         p = subprocess.Popen(
83             "ip -o addr list",
84             shell=True,
85             stdout=subprocess.PIPE,
86             stderr=subprocess.PIPE,
87             universal_newlines = True,
88         )
89         stdout, stderr = p.communicate()
90         m = _re_inet.findall(stdout)
91         ip = m[0][1].split("/")[0]
92     else:
93         ip = socket.gethostbyname(host)
94
95     return ip
96
97 def gethostbyname(host):
98     global hostbyname_cache
99     global hostbyname_cache_lock
100     
101     hostbyname = hostbyname_cache.get(host)
102     if not hostbyname:
103         with hostbyname_cache_lock:
104             hostbyname = resolve_hostname(host)
105             hostbyname_cache[host] = hostbyname
106
107             msg = " Added hostbyname %s - %s " % (host, hostbyname)
108             log(msg, logging.DEBUG)
109
110     return hostbyname
111
112 OPENSSH_HAS_PERSIST = None
113
114 def openssh_has_persist():
115     """ The ssh_config options ControlMaster and ControlPersist allow to
116     reuse a same network connection for multiple ssh sessions. In this 
117     way limitations on number of open ssh connections can be bypassed.
118     However, older versions of openSSH do not support this feature.
119     This function is used to determine if ssh connection persist features
120     can be used.
121     """
122     global OPENSSH_HAS_PERSIST
123     if OPENSSH_HAS_PERSIST is None:
124         proc = subprocess.Popen(
125             ["ssh", "-v"],
126             stdout = subprocess.PIPE,
127             stderr = subprocess.STDOUT,
128             stdin = subprocess.DEVNULL,
129             universal_newlines = True,
130         )
131         out,err = proc.communicate()
132         proc.wait()
133         
134         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
135         OPENSSH_HAS_PERSIST = bool(vre.match(out))
136     return OPENSSH_HAS_PERSIST
137
138 def make_server_key_args(server_key, host, port):
139     """ Returns a reference to a temporary known_hosts file, to which 
140     the server key has been added. 
141     
142     Make sure to hold onto the temp file reference until the process is 
143     done with it
144
145     :param server_key: the server public key
146     :type server_key: str
147
148     :param host: the hostname
149     :type host: str
150
151     :param port: the ssh port
152     :type port: str
153
154     """
155     if port is not None:
156         host = '%s:%s' % (host, str(port))
157
158     # Create a temporary server key file
159     tmp_known_hosts = tempfile.NamedTemporaryFile()
160    
161     hostbyname = gethostbyname(host) 
162
163     # Add the intended host key
164     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
165     
166     # If we're not in strict mode, add user-configured keys
167     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
168         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
169         if os.access(user_hosts_path, os.R_OK):
170             f = open(user_hosts_path, "r")
171             tmp_known_hosts.write(f.read())
172             f.close()
173         
174     tmp_known_hosts.flush()
175     
176     return tmp_known_hosts
177
178 def make_control_path(agent, forward_x11):
179     ctrl_path = "/tmp/nepi_ssh"
180
181     if agent:
182         ctrl_path +="_a"
183
184     if forward_x11:
185         ctrl_path +="_x"
186
187     ctrl_path += "-%r@%h:%p"
188
189     return ctrl_path
190
191 def shell_escape(s):
192     """ Escapes strings so that they are safe to use as command-line 
193     arguments """
194     if SHELL_SAFE.match(s):
195         # safe string - no escaping needed
196         return s
197     else:
198         # unsafe string - escape
199         def escape(c):
200             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
201                 return c
202             else:
203                 return "'$'\\x%02x''" % (ord(c),)
204         s = ''.join(map(escape, s))
205         return "'%s'" % (s,)
206
207 def eintr_retry(func):
208     """Retries a function invocation when a EINTR occurs"""
209     import functools
210     @functools.wraps(func)
211     def rv(*p, **kw):
212         retry = kw.pop("_retry", False)
213         for i in range(0 if retry else 4):
214             try:
215                 return func(*p, **kw)
216             except (select.error, socket.error) as args:
217                 if args[0] == errno.EINTR:
218                     continue
219                 else:
220                     raise 
221             except OSError as e:
222                 if e.errno == errno.EINTR:
223                     continue
224                 else:
225                     raise
226         else:
227             return func(*p, **kw)
228     return rv
229
230 def rexec(command, host, user, 
231         port = None,
232         gwuser = None,
233         gw = None, 
234         agent = True,
235         sudo = False,
236         identity = None,
237         server_key = None,
238         env = None,
239         tty = False,
240         connect_timeout = 30,
241         retry = 3,
242         persistent = True,
243         forward_x11 = False,
244         blocking = True,
245         strict_host_checking = True):
246     """
247     Executes a remote command, returns ((stdout,stderr),process)
248     """
249
250     tmp_known_hosts = None
251     if not gw:
252         hostip = gethostbyname(host)
253     else: hostip = None
254
255     args = ['ssh', '-C',
256             # Don't bother with localhost. Makes test easier
257             '-o', 'NoHostAuthenticationForLocalhost=yes',
258             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
259             '-o', 'ConnectionAttempts=3',
260             '-o', 'ServerAliveInterval=30',
261             '-o', 'TCPKeepAlive=yes',
262             '-o', 'Batchmode=yes',
263             '-l', user, hostip or host]
264
265     if persistent and openssh_has_persist():
266         args.extend([
267             '-o', 'ControlMaster=auto',
268             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
269             '-o', 'ControlPersist=60' ])
270
271     if not strict_host_checking:
272         # Do not check for Host key. Unsafe.
273         args.extend(['-o', 'StrictHostKeyChecking=no'])
274
275     if gw:
276         proxycommand = _proxy_command(gw, gwuser, identity)
277         args.extend(['-o', proxycommand])
278
279     if agent:
280         args.append('-A')
281
282     if port:
283         args.append('-p%d' % port)
284
285     if identity:
286         identity = os.path.expanduser(identity)
287         args.extend(('-i', identity))
288
289     if tty:
290         args.append('-t')
291         args.append('-t')
292
293     if forward_x11:
294         args.append('-X')
295
296     if server_key:
297         # Create a temporary server key file
298         tmp_known_hosts = make_server_key_args(server_key, host, port)
299         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
300
301     if sudo:
302         command = "sudo " + command
303
304     args.append(command)
305
306     log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) 
307
308     stdout = stderr = stdin = subprocess.PIPE
309     if forward_x11:
310         stdout = stderr = stdin = None
311
312     return _retry_rexec(args, log_msg, 
313                         stderr = stderr,
314                         stdin = stdin,
315                         stdout = stdout,
316                         env = env, 
317                         retry = retry, 
318                         tmp_known_hosts = tmp_known_hosts,
319                         blocking = blocking)
320
321 def rcopy(source, dest,
322         port = None,
323         gwuser = None,
324         gw = None,
325         recursive = False,
326         identity = None,
327         server_key = None,
328         retry = 3,
329         strict_host_checking = True):
330     """
331     Copies from/to remote sites.
332     
333     Source and destination should have the user and host encoded
334     as per scp specs.
335     
336     Source can be a list of files to copy to a single destination, 
337     (in which case it is advised that the destination be a folder),
338     or a single file in a string.
339     """
340
341     # Parse destination as <user>@<server>:<path>
342     if isinstance(dest, str) and ':' in dest:
343         remspec, path = dest.split(':',1)
344     elif isinstance(source, str) and ':' in source:
345         remspec, path = source.split(':',1)
346     else:
347         raise ValueError("Both endpoints cannot be local")
348     user,host = remspec.rsplit('@',1)
349     
350     # plain scp
351     tmp_known_hosts = None
352
353     args = ['scp', '-q', '-p', '-C',
354             # 2015-06-01 Thierry: I am commenting off blowfish
355             # as this is not available on a plain ubuntu 15.04 install
356             # this IMHO is too fragile, shoud be something the user
357             # decides explicitly (so he is at least aware of that dependency)
358             # Speed up transfer using blowfish cypher specification which is 
359             # faster than the default one (3des)
360             # '-c', 'blowfish',
361             # Don't bother with localhost. Makes test easier
362             '-o', 'NoHostAuthenticationForLocalhost=yes',
363             '-o', 'ConnectTimeout=60',
364             '-o', 'ConnectionAttempts=3',
365             '-o', 'ServerAliveInterval=30',
366             '-o', 'TCPKeepAlive=yes' ]
367             
368     if port:
369         args.append('-P%d' % port)
370
371     if gw:
372         proxycommand = _proxy_command(gw, gwuser, identity)
373         args.extend(['-o', proxycommand])
374
375     if recursive:
376         args.append('-r')
377
378     if identity:
379         identity = os.path.expanduser(identity)
380         args.extend(('-i', identity))
381
382     if server_key:
383         # Create a temporary server key file
384         tmp_known_hosts = make_server_key_args(server_key, host, port)
385         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
386
387     if not strict_host_checking:
388         # Do not check for Host key. Unsafe.
389         args.extend(['-o', 'StrictHostKeyChecking=no'])
390     
391     if isinstance(source, list):
392         args.extend(source)
393     else:
394         if openssh_has_persist():
395             args.extend([
396                 '-o', 'ControlMaster=auto',
397                 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
398                 ])
399         args.append(source)
400
401     if isinstance(dest, list):
402         args.extend(dest)
403     else:
404         args.append(dest)
405
406     log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
407     
408     return _retry_rexec(args, log_msg, env = None, retry = retry, 
409             tmp_known_hosts = tmp_known_hosts,
410             blocking = True)
411
412 def rspawn(command, pidfile, 
413            stdout = '/dev/null', 
414            stderr = STDOUT, 
415            stdin = '/dev/null',
416            home = None, 
417            create_home = False, 
418            sudo = False,
419            host = None, 
420            port = None, 
421            user = None, 
422            gwuser = None,
423            gw = None,
424            agent = None, 
425            identity = None, 
426            server_key = None,
427            tty = False,
428            strict_host_checking = True):
429     """
430     Spawn a remote command such that it will continue working asynchronously in 
431     background. 
432
433         :param command: The command to run, it should be a single line.
434         :type command: str
435
436         :param pidfile: Path to a file where to store the pid and ppid of the 
437                         spawned process
438         :type pidfile: str
439
440         :param stdout: Path to file to redirect standard output. 
441                        The default value is /dev/null
442         :type stdout: str
443
444         :param stderr: Path to file to redirect standard error.
445                        If the special STDOUT value is used, stderr will 
446                        be redirected to the same file as stdout
447         :type stderr: str
448
449         :param stdin: Path to a file with input to be piped into the command's standard input
450         :type stdin: str
451
452         :param home: Path to working directory folder. 
453                     It is assumed to exist unless the create_home flag is set.
454         :type home: str
455
456         :param create_home: Flag to force creation of the home folder before 
457                             running the command
458         :type create_home: bool
459  
460         :param sudo: Flag forcing execution with sudo user
461         :type sudo: bool
462         
463         :rtype: tuple
464
465         (stdout, stderr), process
466         
467         Of the spawning process, which only captures errors at spawning time.
468         Usually only useful for diagnostics.
469     """
470     # Start process in a "daemonized" way, using nohup and heavy
471     # stdin/out redirection to avoid connection issues
472     if stderr is STDOUT:
473         stderr = '&1'
474     else:
475         stderr = ' ' + stderr
476     
477     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
478         'command' : command,
479         'pidfile' : shell_escape(pidfile),
480         'stdout' : stdout,
481         'stderr' : stderr,
482         'stdin' : stdin,
483     }
484     
485     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
486             'command' : shell_escape(daemon_command),
487             'sudo' : 'sudo -S' if sudo else '',
488             'pidfile' : shell_escape(pidfile),
489             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
490             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
491         }
492
493     (out,err),proc = rexec(
494         cmd,
495         host = host,
496         port = port,
497         user = user,
498         gwuser = gwuser,
499         gw = gw,
500         agent = agent,
501         identity = identity,
502         server_key = server_key,
503         tty = tty,
504         strict_host_checking = strict_host_checking ,
505         )
506     
507     if proc.wait():
508         raise RuntimeError("Failed to set up application on host %s: %s %s" % (host, out,err,))
509
510     return ((out, err), proc)
511
512 @eintr_retry
513 def rgetpid(pidfile,
514             host = None, 
515             port = None, 
516             user = None, 
517             gwuser = None,
518             gw = None,
519             agent = None, 
520             identity = None,
521             server_key = None,
522             strict_host_checking = True):
523     """
524     Returns the pid and ppid of a process from a remote file where the 
525     information was stored.
526
527         :param home: Path to directory where the pidfile is located
528         :type home: str
529
530         :param pidfile: Name of file containing the pid information
531         :type pidfile: str
532         
533         :rtype: int
534         
535         A (pid, ppid) tuple useful for calling rstatus and rkill,
536         or None if the pidfile isn't valid yet (can happen when process is staring up)
537
538     """
539     (out,err),proc = rexec(
540         "cat %(pidfile)s" % {
541             'pidfile' : pidfile,
542         },
543         host = host,
544         port = port,
545         user = user,
546         gwuser = gwuser,
547         gw = gw,
548         agent = agent,
549         identity = identity,
550         server_key = server_key,
551         strict_host_checking = strict_host_checking
552         )
553         
554     if proc.wait():
555         return None
556     
557     if out:
558         try:
559             return [ int(x) for x in out.strip().split(' ',1)) ]
560         except:
561             # Ignore, many ways to fail that don't matter that much
562             return None
563
564 @eintr_retry
565 def rstatus(pid, ppid, 
566         host = None, 
567         port = None, 
568         user = None, 
569         gwuser = None,
570         gw = None,
571         agent = None, 
572         identity = None,
573         server_key = None,
574         strict_host_checking = True):
575     """
576     Returns a code representing the the status of a remote process
577
578         :param pid: Process id of the process
579         :type pid: int
580
581         :param ppid: Parent process id of process
582         :type ppid: int
583     
584         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
585     
586     """
587     (out,err),proc = rexec(
588         # Check only by pid. pid+ppid does not always work (especially with sudo) 
589         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
590             'ppid' : ppid,
591             'pid' : pid,
592         },
593         host = host,
594         port = port,
595         user = user,
596         gwuser = gwuser,
597         gw = gw,
598         agent = agent,
599         identity = identity,
600         server_key = server_key,
601         strict_host_checking = strict_host_checking
602         )
603     
604     if proc.wait():
605         return ProcStatus.NOT_STARTED
606     
607     status = False
608     if err:
609         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
610             status = True
611     elif out:
612         status = (out.strip() == 'wait')
613     else:
614         return ProcStatus.NOT_STARTED
615     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
616
617 @eintr_retry
618 def rkill(pid, ppid,
619         host = None, 
620         port = None, 
621         user = None, 
622         gwuser = None,
623         gw = None,
624         agent = None, 
625         sudo = False,
626         identity = None, 
627         server_key = None, 
628         nowait = False,
629         strict_host_checking = True):
630     """
631     Sends a kill signal to a remote process.
632
633     First tries a SIGTERM, and if the process does not end in 10 seconds,
634     it sends a SIGKILL.
635  
636         :param pid: Process id of process to be killed
637         :type pid: int
638
639         :param ppid: Parent process id of process to be killed
640         :type ppid: int
641
642         :param sudo: Flag indicating if sudo should be used to kill the process
643         :type sudo: bool
644         
645     """
646     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
647     cmd = """
648 SUBKILL="%(subkill)s" ;
649 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
650 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
651 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
652     sleep 0.2 
653     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
654         break
655     else
656         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
657         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
658     fi
659     sleep 1.8
660 done
661 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
662     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
663     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
664 fi
665 """
666     if nowait:
667         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
668
669     (out,err),proc = rexec(
670         cmd % {
671             'ppid' : ppid,
672             'pid' : pid,
673             'sudo' : 'sudo -S' if sudo else '',
674             'subkill' : subkill,
675         },
676         host = host,
677         port = port,
678         user = user,
679         gwuser = gwuser,
680         gw = gw,
681         agent = agent,
682         identity = identity,
683         server_key = server_key,
684         strict_host_checking = strict_host_checking
685         )
686     
687     # wait, don't leave zombies around
688     proc.wait()
689
690     return (out, err), proc
691
692 def _retry_rexec(args,
693                  log_msg,
694                  stdout = subprocess.PIPE,
695                  stdin = subprocess.PIPE, 
696                  stderr = subprocess.PIPE,
697                  env = None,
698                  retry = 3,
699                  tmp_known_hosts = None,
700                  blocking = True):
701
702     for x in range(retry):
703         # display command actually invoked when debug is turned on
704         message = " ".join( [ "'{}'".format(arg) for arg in args ] )
705         log("sshfuncs: invoking {}".format(message), logging.DEBUG)
706         # connects to the remote host and starts a remote connection
707         proc = subprocess.Popen(
708             args,
709             env = env,
710             stdout = stdout,
711             stdin = stdin, 
712             stderr = stderr,
713             universal_newlines = True,
714         )        
715         # attach tempfile object to the process, to make sure the file stays
716         # alive until the process is finished with it
717         proc._known_hosts = tmp_known_hosts
718     
719         # The argument block == False forces to rexec to return immediately, 
720         # without blocking 
721         try:
722             err = out = " "
723             if blocking:
724                 #(out, err) = proc.communicate()
725                 # The method communicate was re implemented for performance issues
726                 # when using python subprocess communicate method the ssh commands 
727                 # last one minute each
728                 #log("BEFORE communicate", level=logging.INFO); import time; beg=time.time()
729                 out, err = _communicate(proc, input=None)
730                 #log("AFTER communicate - {}s".format(time.time()-beg), level=logging.INFO)
731
732             elif stdout:
733                 out = proc.stdout.read()
734                 if proc.poll() and stderr:
735                     err = proc.stderr.read()
736
737             log(log_msg, logging.DEBUG, out, err)
738
739             if proc.poll():
740                 skip = False
741
742                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
743                     # SSH error, can safely retry
744                     skip = True 
745                 elif retry:
746                     # Probably timed out or plain failed but can retry
747                     skip = True 
748                 
749                 if skip:
750                     t = x*2
751                     msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
752                             t, x, " ".join(args))
753                     log(msg, logging.DEBUG)
754
755                     time.sleep(t)
756                     continue
757             break
758         except RuntimeError as e:
759             msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
760             log(msg, logging.DEBUG, out, err)
761
762             if retry <= 0:
763                 raise
764             retry -= 1
765
766     return ((out, err), proc)
767
768 # POSIX
769 # Don't remove. The method communicate was re implemented for performance issues
770 def _communicate(proc, input, timeout=None, err_on_timeout=True):
771     read_set = []
772     write_set = []
773     stdout = None # Return
774     stderr = None # Return
775
776     killed = False
777
778     if timeout is not None:
779         timelimit = time.time() + timeout
780         killtime = timelimit + 4
781         bailtime = timelimit + 4
782
783     if proc.stdin:
784         # Flush stdio buffer.  This might block, if the user has
785         # been writing to .stdin in an uncontrolled fashion.
786         proc.stdin.flush()
787         if input:
788             write_set.append(proc.stdin)
789         else:
790             proc.stdin.close()
791
792     if proc.stdout:
793         read_set.append(proc.stdout)
794         stdout = []
795
796     if proc.stderr:
797         read_set.append(proc.stderr)
798         stderr = []
799
800     input_offset = 0
801     while read_set or write_set:
802         if timeout is not None:
803             curtime = time.time()
804             if timeout is None or curtime > timelimit:
805                 if curtime > bailtime:
806                     break
807                 elif curtime > killtime:
808                     signum = signal.SIGKILL
809                 else:
810                     signum = signal.SIGTERM
811                 # Lets kill it
812                 os.kill(proc.pid, signum)
813                 select_timeout = 0.5
814             else:
815                 select_timeout = timelimit - curtime + 0.1
816         else:
817             select_timeout = 1.0
818
819         if select_timeout > 1.0:
820             select_timeout = 1.0
821
822         try:
823             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
824         except select.error as e:
825             if e[0] != 4:
826                 raise
827             else:
828                 continue
829
830         if not rlist and not wlist and not xlist and proc.poll() is not None:
831             # timeout and process exited, say bye
832             break
833
834         if proc.stdin in wlist:
835             # When select has indicated that the file is writable,
836             # we can write up to PIPE_BUF bytes without risk
837             # blocking.  POSIX defines PIPE_BUF >= 512
838             bytes_written = os.write(proc.stdin.fileno(),
839                     buffer(input, input_offset, 512))
840             input_offset += bytes_written
841
842             if input_offset >= len(input):
843                 proc.stdin.close()
844                 write_set.remove(proc.stdin)
845
846         if proc.stdout in rlist:
847             # python2 version used to do this
848             # data = os.read(proc.stdout.fileno(), 1024)
849             # however this always returned bytes...
850             data = proc.stdout.read()
851             log('we have read {}'.format(data))
852             # data should be str and not bytes because we use
853             # universal_lines = True, but to be clean
854             # instead of saying data != "" 
855             if not data:
856                 log('closing stdout')
857                 proc.stdout.close()
858                 read_set.remove(proc.stdout)
859             stdout.append(data)
860
861         if proc.stderr in rlist:
862             # likewise (see above)
863             # data = os.read(proc.stderr.fileno(), 1024)
864             data = proc.stderr.read()
865             if not data:
866                 proc.stderr.close()
867                 read_set.remove(proc.stderr)
868             stderr.append(data)
869
870     # All data exchanged.  Translate lists into strings.
871     if stdout is not None:
872         stdout = ''.join(stdout)
873     if stderr is not None:
874         stderr = ''.join(stderr)
875
876 #    # Translate newlines, if requested.  We cannot let the file
877 #    # object do the translation: It is based on stdio, which is
878 #    # impossible to combine with select (unless forcing no
879 #    # buffering).
880 #    if proc.universal_newlines and hasattr(file, 'newlines'):
881 #        if stdout:
882 #            stdout = proc._translate_newlines(stdout)
883 #        if stderr:
884 #            stderr = proc._translate_newlines(stderr)
885
886     if killed and err_on_timeout:
887         errcode = proc.poll()
888         raise RuntimeError("Operation timed out", errcode, stdout, stderr)
889     else:
890         if killed:
891             proc.poll()
892         else:
893             proc.wait()
894         return (stdout, stderr)
895
896 def _proxy_command(gw, gwuser, gwidentity):
897     """
898     Constructs the SSH ProxyCommand option to add to the SSH command to connect
899     via a proxy
900         :param gw: SSH proxy hostname
901         :type gw:  str 
902        
903         :param gwuser: SSH proxy username
904         :type gwuser:  str
905
906         :param gwidentity: SSH proxy identity file 
907         :type gwidentity:  str
908
909   
910         :rtype: str 
911         
912         returns the SSH ProxyCommand option.
913     """
914
915     proxycommand = 'ProxyCommand=ssh -q '
916     if gwidentity:
917         proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
918     if gwuser:
919         proxycommand += '%s' % gwuser
920     else:
921         proxycommand += '%r'
922     proxycommand += '@%s -W %%h:%%p' % gw
923
924     return proxycommand
925