locally executed commands need to set universal_newlines too in py3
[nepi.git] / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 #         Claudio Freire <claudio-daniel.freire@inria.fr>
19
20 ## TODO: This code needs reviewing !!!
21
22 import base64
23 import errno
24 import hashlib
25 import logging
26 import os
27 import os.path
28 import re
29 import select
30 import signal
31 import socket
32 import subprocess
33 import threading
34 import time
35 import tempfile
36
37 from six import PY2
38
39 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") 
40
41 logger = logging.getLogger("sshfuncs")
42
43 def log(msg, level = logging.DEBUG, out = None, err = None):
44     if out:
45         msg += " - OUT: %s " % out
46     if err:
47         msg += " - ERROR: %s " % err
48     logger.log(level, msg)
49
50 if hasattr(os, "devnull"):
51     DEV_NULL = os.devnull
52 else:
53     DEV_NULL = "/dev/null"
54
55 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
56
57 class STDOUT: 
58     """
59     Special value that when given to rspawn in stderr causes stderr to 
60     redirect to whatever stdout was redirected to.
61     """
62     pass
63
64 class ProcStatus:
65     """
66     Codes for status of remote spawned process
67     """
68     # Process is still running
69     RUNNING = 1
70
71     # Process is finished
72     FINISHED = 2
73     
74     # Process hasn't started running yet (this should be very rare)
75     NOT_STARTED = 3
76
77 hostbyname_cache = dict()
78 hostbyname_cache_lock = threading.Lock()
79
80 def resolve_hostname(host):
81     ip = None
82
83     if host in ["localhost", "127.0.0.1", "::1"]:
84         extras = {} if PY2 else {'universal_newlines' : True}
85         p = subprocess.Popen(
86             "ip -o addr list",
87             shell=True,
88             stdout=subprocess.PIPE,
89             stderr=subprocess.PIPE,
90             **extras
91         )
92         stdout, stderr = p.communicate()
93         m = _re_inet.findall(stdout)
94         ip = m[0][1].split("/")[0]
95     else:
96         ip = socket.gethostbyname(host)
97
98     return ip
99
100 def gethostbyname(host):
101     global hostbyname_cache
102     global hostbyname_cache_lock
103     
104     hostbyname = hostbyname_cache.get(host)
105     if not hostbyname:
106         with hostbyname_cache_lock:
107             hostbyname = resolve_hostname(host)
108             hostbyname_cache[host] = hostbyname
109
110             msg = " Added hostbyname %s - %s " % (host, hostbyname)
111             log(msg, logging.DEBUG)
112
113     return hostbyname
114
115 OPENSSH_HAS_PERSIST = None
116
117 def openssh_has_persist():
118     """ The ssh_config options ControlMaster and ControlPersist allow to
119     reuse a same network connection for multiple ssh sessions. In this 
120     way limitations on number of open ssh connections can be bypassed.
121     However, older versions of openSSH do not support this feature.
122     This function is used to determine if ssh connection persist features
123     can be used.
124     """
125     global OPENSSH_HAS_PERSIST
126     if OPENSSH_HAS_PERSIST is None:
127         extras = {} if PY2 else {'universal_newlines' : True}
128         with open("/dev/null") as null:
129             proc = subprocess.Popen(
130                 ["ssh", "-v"],
131                 stdout = subprocess.PIPE,
132                 stderr = subprocess.STDOUT,
133                 stdin = null,
134                 **extras
135             )
136             out,err = proc.communicate()
137             proc.wait()
138         
139         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
140         OPENSSH_HAS_PERSIST = bool(vre.match(out))
141     return OPENSSH_HAS_PERSIST
142
143 def make_server_key_args(server_key, host, port):
144     """ Returns a reference to a temporary known_hosts file, to which 
145     the server key has been added. 
146     
147     Make sure to hold onto the temp file reference until the process is 
148     done with it
149
150     :param server_key: the server public key
151     :type server_key: str
152
153     :param host: the hostname
154     :type host: str
155
156     :param port: the ssh port
157     :type port: str
158
159     """
160     if port is not None:
161         host = '%s:%s' % (host, str(port))
162
163     # Create a temporary server key file
164     tmp_known_hosts = tempfile.NamedTemporaryFile()
165    
166     hostbyname = gethostbyname(host) 
167
168     # Add the intended host key
169     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
170     
171     # If we're not in strict mode, add user-configured keys
172     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
173         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
174         if os.access(user_hosts_path, os.R_OK):
175             with open(user_hosts_path, "r") as f:
176                 tmp_known_hosts.write(f.read())
177         
178     tmp_known_hosts.flush()
179     
180     return tmp_known_hosts
181
182 def make_control_path(agent, forward_x11):
183     ctrl_path = "/tmp/nepi_ssh"
184
185     if agent:
186         ctrl_path +="_a"
187
188     if forward_x11:
189         ctrl_path +="_x"
190
191     ctrl_path += "-%r@%h:%p"
192
193     return ctrl_path
194
195 def shell_escape(s):
196     """ Escapes strings so that they are safe to use as command-line 
197     arguments """
198     if SHELL_SAFE.match(s):
199         # safe string - no escaping needed
200         return s
201     else:
202         # unsafe string - escape
203         def escape(c):
204             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
205                 return c
206             else:
207                 return "'$'\\x%02x''" % (ord(c),)
208         s = ''.join(map(escape, s))
209         return "'%s'" % (s,)
210
211 def eintr_retry(func):
212     """Retries a function invocation when a EINTR occurs"""
213     import functools
214     @functools.wraps(func)
215     def rv(*p, **kw):
216         retry = kw.pop("_retry", False)
217         for i in range(0 if retry else 4):
218             try:
219                 return func(*p, **kw)
220             except (select.error, socket.error) as args:
221                 if args[0] == errno.EINTR:
222                     continue
223                 else:
224                     raise 
225             except OSError as e:
226                 if e.errno == errno.EINTR:
227                     continue
228                 else:
229                     raise
230         else:
231             return func(*p, **kw)
232     return rv
233
234 def rexec(command, host, user, 
235         port = None,
236         gwuser = None,
237         gw = None, 
238         agent = True,
239         sudo = False,
240         identity = None,
241         server_key = None,
242         env = None,
243         tty = False,
244         connect_timeout = 30,
245         retry = 3,
246         persistent = True,
247         forward_x11 = False,
248         blocking = True,
249         strict_host_checking = True):
250     """
251     Executes a remote command, returns ((stdout,stderr),process)
252     """
253
254     tmp_known_hosts = None
255     if not gw:
256         hostip = gethostbyname(host)
257     else: hostip = None
258
259     args = ['ssh', '-C',
260             # Don't bother with localhost. Makes test easier
261             '-o', 'NoHostAuthenticationForLocalhost=yes',
262             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
263             '-o', 'ConnectionAttempts=3',
264             '-o', 'ServerAliveInterval=30',
265             '-o', 'TCPKeepAlive=yes',
266             '-o', 'Batchmode=yes',
267             '-l', user, hostip or host]
268
269     if persistent and openssh_has_persist():
270         args.extend([
271             '-o', 'ControlMaster=auto',
272             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
273             '-o', 'ControlPersist=60' ])
274
275     if not strict_host_checking:
276         # Do not check for Host key. Unsafe.
277         args.extend(['-o', 'StrictHostKeyChecking=no'])
278
279     if gw:
280         proxycommand = _proxy_command(gw, gwuser, identity)
281         args.extend(['-o', proxycommand])
282
283     if agent:
284         args.append('-A')
285
286     if port:
287         args.append('-p%d' % port)
288
289     if identity:
290         identity = os.path.expanduser(identity)
291         args.extend(('-i', identity))
292
293     if tty:
294         args.append('-t')
295         args.append('-t')
296
297     if forward_x11:
298         args.append('-X')
299
300     if server_key:
301         # Create a temporary server key file
302         tmp_known_hosts = make_server_key_args(server_key, host, port)
303         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
304
305     if sudo:
306         command = "sudo " + command
307
308     args.append(command)
309
310     log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) 
311
312     stdout = stderr = stdin = subprocess.PIPE
313     if forward_x11:
314         stdout = stderr = stdin = None
315
316     return _retry_rexec(args, log_msg, 
317                         stderr = stderr,
318                         stdin = stdin,
319                         stdout = stdout,
320                         env = env, 
321                         retry = retry, 
322                         tmp_known_hosts = tmp_known_hosts,
323                         blocking = blocking)
324
325 def rcopy(source, dest,
326           port = None,
327           gwuser = None,
328           gw = None,
329           recursive = False,
330           identity = None,
331           server_key = None,
332           retry = 3,
333           strict_host_checking = True):
334     """
335     Copies from/to remote sites.
336     
337     Source and destination should have the user and host encoded
338     as per scp specs.
339     
340     Source can be a list of files to copy to a single destination, 
341     (in which case it is advised that the destination be a folder),
342     or a single file in a string.
343     """
344
345     # Parse destination as <user>@<server>:<path>
346     if isinstance(dest, str) and ':' in dest:
347         remspec, path = dest.split(':',1)
348     elif isinstance(source, str) and ':' in source:
349         remspec, path = source.split(':',1)
350     else:
351         raise ValueError("Both endpoints cannot be local")
352     user,host = remspec.rsplit('@',1)
353     
354     # plain scp
355     tmp_known_hosts = None
356
357     args = ['scp', '-q', '-p', '-C',
358             # 2015-06-01 Thierry: I am commenting off blowfish
359             # as this is not available on a plain ubuntu 15.04 install
360             # this IMHO is too fragile, shoud be something the user
361             # decides explicitly (so he is at least aware of that dependency)
362             # Speed up transfer using blowfish cypher specification which is 
363             # faster than the default one (3des)
364             # '-c', 'blowfish',
365             # Don't bother with localhost. Makes test easier
366             '-o', 'NoHostAuthenticationForLocalhost=yes',
367             '-o', 'ConnectTimeout=60',
368             '-o', 'ConnectionAttempts=3',
369             '-o', 'ServerAliveInterval=30',
370             '-o', 'TCPKeepAlive=yes' ]
371             
372     if port:
373         args.append('-P%d' % port)
374
375     if gw:
376         proxycommand = _proxy_command(gw, gwuser, identity)
377         args.extend(['-o', proxycommand])
378
379     if recursive:
380         args.append('-r')
381
382     if identity:
383         identity = os.path.expanduser(identity)
384         args.extend(('-i', identity))
385
386     if server_key:
387         # Create a temporary server key file
388         tmp_known_hosts = make_server_key_args(server_key, host, port)
389         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
390
391     if not strict_host_checking:
392         # Do not check for Host key. Unsafe.
393         args.extend(['-o', 'StrictHostKeyChecking=no'])
394     
395     if isinstance(source, list):
396         args.extend(source)
397     else:
398         if openssh_has_persist():
399             args.extend([
400                 '-o', 'ControlMaster=auto',
401                 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
402                 ])
403         args.append(source)
404
405     if isinstance(dest, list):
406         args.extend(dest)
407     else:
408         args.append(dest)
409
410     log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
411     
412     return _retry_rexec(args, log_msg, env = None, retry = retry, 
413             tmp_known_hosts = tmp_known_hosts,
414             blocking = True)
415
416 def rspawn(command, pidfile, 
417            stdout = '/dev/null', 
418            stderr = STDOUT, 
419            stdin = '/dev/null',
420            home = None, 
421            create_home = False, 
422            sudo = False,
423            host = None, 
424            port = None, 
425            user = None, 
426            gwuser = None,
427            gw = None,
428            agent = None, 
429            identity = None, 
430            server_key = None,
431            tty = False,
432            strict_host_checking = True):
433     """
434     Spawn a remote command such that it will continue working asynchronously in 
435     background. 
436
437         :param command: The command to run, it should be a single line.
438         :type command: str
439
440         :param pidfile: Path to a file where to store the pid and ppid of the 
441                         spawned process
442         :type pidfile: str
443
444         :param stdout: Path to file to redirect standard output. 
445                        The default value is /dev/null
446         :type stdout: str
447
448         :param stderr: Path to file to redirect standard error.
449                        If the special STDOUT value is used, stderr will 
450                        be redirected to the same file as stdout
451         :type stderr: str
452
453         :param stdin: Path to a file with input to be piped into the command's standard input
454         :type stdin: str
455
456         :param home: Path to working directory folder. 
457                     It is assumed to exist unless the create_home flag is set.
458         :type home: str
459
460         :param create_home: Flag to force creation of the home folder before 
461                             running the command
462         :type create_home: bool
463  
464         :param sudo: Flag forcing execution with sudo user
465         :type sudo: bool
466         
467         :rtype: tuple
468
469         (stdout, stderr), process
470         
471         Of the spawning process, which only captures errors at spawning time.
472         Usually only useful for diagnostics.
473     """
474     # Start process in a "daemonized" way, using nohup and heavy
475     # stdin/out redirection to avoid connection issues
476     if stderr is STDOUT:
477         stderr = '&1'
478     else:
479         stderr = ' ' + stderr
480     
481     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
482         'command' : command,
483         'pidfile' : shell_escape(pidfile),
484         'stdout' : stdout,
485         'stderr' : stderr,
486         'stdin' : stdin,
487     }
488     
489     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
490             'command' : shell_escape(daemon_command),
491             'sudo' : 'sudo -S' if sudo else '',
492             'pidfile' : shell_escape(pidfile),
493             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
494             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
495         }
496
497     (out,err),proc = rexec(
498         cmd,
499         host = host,
500         port = port,
501         user = user,
502         gwuser = gwuser,
503         gw = gw,
504         agent = agent,
505         identity = identity,
506         server_key = server_key,
507         tty = tty,
508         strict_host_checking = strict_host_checking ,
509         )
510     
511     if proc.wait():
512         raise RuntimeError("Failed to set up application on host %s: %s %s" % (host, out,err,))
513
514     return ((out, err), proc)
515
516 @eintr_retry
517 def rgetpid(pidfile,
518             host = None, 
519             port = None, 
520             user = None, 
521             gwuser = None,
522             gw = None,
523             agent = None, 
524             identity = None,
525             server_key = None,
526             strict_host_checking = True):
527     """
528     Returns the pid and ppid of a process from a remote file where the 
529     information was stored.
530
531         :param home: Path to directory where the pidfile is located
532         :type home: str
533
534         :param pidfile: Name of file containing the pid information
535         :type pidfile: str
536         
537         :rtype: int
538         
539         A (pid, ppid) tuple useful for calling rstatus and rkill,
540         or None if the pidfile isn't valid yet (can happen when process is staring up)
541
542     """
543     (out,err),proc = rexec(
544         "cat %(pidfile)s" % {
545             'pidfile' : pidfile,
546         },
547         host = host,
548         port = port,
549         user = user,
550         gwuser = gwuser,
551         gw = gw,
552         agent = agent,
553         identity = identity,
554         server_key = server_key,
555         strict_host_checking = strict_host_checking
556         )
557         
558     if proc.wait():
559         return None
560     
561     if out:
562         try:
563             return [ int(x) for x in out.strip().split(' ', 1) ]
564         except:
565             # Ignore, many ways to fail that don't matter that much
566             return None
567
568 @eintr_retry
569 def rstatus(pid, ppid, 
570         host = None, 
571         port = None, 
572         user = None, 
573         gwuser = None,
574         gw = None,
575         agent = None, 
576         identity = None,
577         server_key = None,
578         strict_host_checking = True):
579     """
580     Returns a code representing the the status of a remote process
581
582         :param pid: Process id of the process
583         :type pid: int
584
585         :param ppid: Parent process id of process
586         :type ppid: int
587     
588         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
589     
590     """
591     (out,err),proc = rexec(
592         # Check only by pid. pid+ppid does not always work (especially with sudo) 
593         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
594             'ppid' : ppid,
595             'pid' : pid,
596         },
597         host = host,
598         port = port,
599         user = user,
600         gwuser = gwuser,
601         gw = gw,
602         agent = agent,
603         identity = identity,
604         server_key = server_key,
605         strict_host_checking = strict_host_checking
606         )
607     
608     if proc.wait():
609         return ProcStatus.NOT_STARTED
610     
611     status = False
612     if err:
613         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
614             status = True
615     elif out:
616         status = (out.strip() == 'wait')
617     else:
618         return ProcStatus.NOT_STARTED
619     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
620
621 @eintr_retry
622 def rkill(pid, ppid,
623         host = None, 
624         port = None, 
625         user = None, 
626         gwuser = None,
627         gw = None,
628         agent = None, 
629         sudo = False,
630         identity = None, 
631         server_key = None, 
632         nowait = False,
633         strict_host_checking = True):
634     """
635     Sends a kill signal to a remote process.
636
637     First tries a SIGTERM, and if the process does not end in 10 seconds,
638     it sends a SIGKILL.
639  
640         :param pid: Process id of process to be killed
641         :type pid: int
642
643         :param ppid: Parent process id of process to be killed
644         :type ppid: int
645
646         :param sudo: Flag indicating if sudo should be used to kill the process
647         :type sudo: bool
648         
649     """
650     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
651     cmd = """
652 SUBKILL="%(subkill)s" ;
653 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
654 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
655 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
656     sleep 0.2 
657     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
658         break
659     else
660         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
661         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
662     fi
663     sleep 1.8
664 done
665 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
666     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
667     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
668 fi
669 """
670     if nowait:
671         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
672
673     (out,err),proc = rexec(
674         cmd % {
675             'ppid' : ppid,
676             'pid' : pid,
677             'sudo' : 'sudo -S' if sudo else '',
678             'subkill' : subkill,
679         },
680         host = host,
681         port = port,
682         user = user,
683         gwuser = gwuser,
684         gw = gw,
685         agent = agent,
686         identity = identity,
687         server_key = server_key,
688         strict_host_checking = strict_host_checking
689         )
690     
691     # wait, don't leave zombies around
692     proc.wait()
693
694     return (out, err), proc
695
696 def _retry_rexec(args,
697                  log_msg,
698                  stdout = subprocess.PIPE,
699                  stdin = subprocess.PIPE, 
700                  stderr = subprocess.PIPE,
701                  env = None,
702                  retry = 3,
703                  tmp_known_hosts = None,
704                  blocking = True):
705
706     for x in range(retry):
707         # display command actually invoked when debug is turned on
708         message = " ".join( [ "'{}'".format(arg) for arg in args ] )
709         log("sshfuncs: invoking {}".format(message), logging.DEBUG)
710         extras = {} if PY2 else {'universal_newlines' : True}
711         # connects to the remote host and starts a remote connection
712         proc = subprocess.Popen(
713             args,
714             env = env,
715             stdout = stdout,
716             stdin = stdin, 
717             stderr = stderr,
718             **extras
719         )        
720         # attach tempfile object to the process, to make sure the file stays
721         # alive until the process is finished with it
722         proc._known_hosts = tmp_known_hosts
723     
724         # The argument block == False forces to rexec to return immediately, 
725         # without blocking 
726         try:
727             err = out = " "
728             if blocking:
729                 #(out, err) = proc.communicate()
730                 # The method communicate was re implemented for performance issues
731                 # when using python subprocess communicate method the ssh commands 
732                 # last one minute each
733                 #log("BEFORE communicate", level=logging.INFO); import time; beg=time.time()
734                 out, err = _communicate(proc, input=None)
735                 #log("AFTER communicate - {}s".format(time.time()-beg), level=logging.INFO)
736
737             elif stdout:
738                 out = proc.stdout.read()
739                 if proc.poll() and stderr:
740                     err = proc.stderr.read()
741
742             log(log_msg, logging.DEBUG, out, err)
743
744             if proc.poll():
745                 skip = False
746
747                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
748                     # SSH error, can safely retry
749                     skip = True 
750                 elif retry:
751                     # Probably timed out or plain failed but can retry
752                     skip = True 
753                 
754                 if skip:
755                     t = x*2
756                     msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
757                             t, x, " ".join(args))
758                     log(msg, logging.DEBUG)
759
760                     time.sleep(t)
761                     continue
762             break
763         except RuntimeError as e:
764             msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
765             log(msg, logging.DEBUG, out, err)
766
767             if retry <= 0:
768                 raise
769             retry -= 1
770
771     return ((out, err), proc)
772
773 # POSIX
774 # Don't remove. The method communicate was re implemented for performance issues
775 def _communicate(proc, input, timeout=None, err_on_timeout=True):
776     read_set = []
777     write_set = []
778     stdout = None # Return
779     stderr = None # Return
780
781     killed = False
782
783     if timeout is not None:
784         timelimit = time.time() + timeout
785         killtime = timelimit + 4
786         bailtime = timelimit + 4
787
788     if proc.stdin:
789         # Flush stdio buffer.  This might block, if the user has
790         # been writing to .stdin in an uncontrolled fashion.
791         proc.stdin.flush()
792         if input:
793             write_set.append(proc.stdin)
794         else:
795             proc.stdin.close()
796
797     if proc.stdout:
798         read_set.append(proc.stdout)
799         stdout = []
800
801     if proc.stderr:
802         read_set.append(proc.stderr)
803         stderr = []
804
805     input_offset = 0
806     while read_set or write_set:
807         if timeout is not None:
808             curtime = time.time()
809             if timeout is None or curtime > timelimit:
810                 if curtime > bailtime:
811                     break
812                 elif curtime > killtime:
813                     signum = signal.SIGKILL
814                 else:
815                     signum = signal.SIGTERM
816                 # Lets kill it
817                 os.kill(proc.pid, signum)
818                 select_timeout = 0.5
819             else:
820                 select_timeout = timelimit - curtime + 0.1
821         else:
822             select_timeout = 1.0
823
824         if select_timeout > 1.0:
825             select_timeout = 1.0
826
827         try:
828             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
829         except select.error as e:
830             if e[0] != 4:
831                 raise
832             else:
833                 continue
834
835         if not rlist and not wlist and not xlist and proc.poll() is not None:
836             # timeout and process exited, say bye
837             break
838
839         if proc.stdin in wlist:
840             # When select has indicated that the file is writable,
841             # we can write up to PIPE_BUF bytes without risk
842             # blocking.  POSIX defines PIPE_BUF >= 512
843             bytes_written = os.write(proc.stdin.fileno(),
844                     buffer(input, input_offset, 512))
845             input_offset += bytes_written
846
847             if input_offset >= len(input):
848                 proc.stdin.close()
849                 write_set.remove(proc.stdin)
850
851         # xxx possible distortion when upgrading to python3
852         # original py2 version used to do
853         # data = os.read(proc.stdout.fileno(), 1024)
854         # but this would return bytes, so..
855         if proc.stdout in rlist:
856             data = proc.stdout.read()
857             if not data:
858                 proc.stdout.close()
859                 read_set.remove(proc.stdout)
860             stdout.append(data)
861
862         # likewise
863         if proc.stderr in rlist:
864             data = proc.stderr.read()
865             if not data:
866                 proc.stderr.close()
867                 read_set.remove(proc.stderr)
868             stderr.append(data)
869
870     # All data exchanged.  Translate lists into strings.
871     if stdout is not None:
872         stdout = ''.join(stdout)
873     if stderr is not None:
874         stderr = ''.join(stderr)
875
876     # Translate newlines, if requested.  We cannot let the file
877     # object do the translation: It is based on stdio, which is
878     # impossible to combine with select (unless forcing no
879     # buffering).
880     # this however seems to make no sense in the context of python3
881     if PY2:
882         if proc.universal_newlines and hasattr(file, 'newlines'):
883             if stdout:
884                 stdout = proc._translate_newlines(stdout)
885             if stderr:
886                 stderr = proc._translate_newlines(stderr)
887
888     if killed and err_on_timeout:
889         errcode = proc.poll()
890         raise RuntimeError("Operation timed out", errcode, stdout, stderr)
891     else:
892         if killed:
893             proc.poll()
894         else:
895             proc.wait()
896         return (stdout, stderr)
897
898 def _proxy_command(gw, gwuser, gwidentity):
899     """
900     Constructs the SSH ProxyCommand option to add to the SSH command to connect
901     via a proxy
902         :param gw: SSH proxy hostname
903         :type gw:  str 
904        
905         :param gwuser: SSH proxy username
906         :type gwuser:  str
907
908         :param gwidentity: SSH proxy identity file 
909         :type gwidentity:  str
910
911   
912         :rtype: str 
913         
914         returns the SSH ProxyCommand option.
915     """
916
917     proxycommand = 'ProxyCommand=ssh -q '
918     if gwidentity:
919         proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
920     if gwuser:
921         proxycommand += '%s' % gwuser
922     else:
923         proxycommand += '%r'
924     proxycommand += '@%s -W %%h:%%p' % gw
925
926     return proxycommand
927