cosmetic
[nepi.git] / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 #         Claudio Freire <claudio-daniel.freire@inria.fr>
19
20 ## TODO: This code needs reviewing !!!
21
22 import base64
23 import errno
24 import hashlib
25 import logging
26 import os
27 import os.path
28 import re
29 import select
30 import signal
31 import socket
32 import subprocess
33 import threading
34 import time
35 import tempfile
36
37 from six import PY2
38
39 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") 
40
41 logger = logging.getLogger("sshfuncs")
42
43 def log(msg, level = logging.DEBUG, out = None, err = None):
44     if out:
45         msg += " - OUT: {} ".format(out)
46     if err:
47         msg += " - ERROR: {} ".format(err)
48     logger.log(level, msg)
49
50 if hasattr(os, "devnull"):
51     DEV_NULL = os.devnull
52 else:
53     DEV_NULL = "/dev/null"
54
55 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
56
57 class STDOUT: 
58     """
59     Special value that when given to rspawn in stderr causes stderr to 
60     redirect to whatever stdout was redirected to.
61     """
62     pass
63
64 class ProcStatus:
65     """
66     Codes for status of remote spawned process
67     """
68     # Process is still running
69     RUNNING = 1
70
71     # Process is finished
72     FINISHED = 2
73     
74     # Process hasn't started running yet (this should be very rare)
75     NOT_STARTED = 3
76
77 hostbyname_cache = dict()
78 hostbyname_cache_lock = threading.Lock()
79
80 def resolve_hostname(host):
81     ip = None
82
83     if host in ["localhost", "127.0.0.1", "::1"]:
84         extras = {} if PY2 else {'universal_newlines' : True}
85         p = subprocess.Popen(
86             "ip -o addr list",
87             shell = True,
88             stdout = subprocess.PIPE,
89             stderr = subprocess.PIPE,
90             **extras
91         )
92         stdout, stderr = p.communicate()
93         m = _re_inet.findall(stdout)
94         ip = m[0][1].split("/")[0]
95     else:
96         ip = socket.gethostbyname(host)
97
98     return ip
99
100 def gethostbyname(host):
101     global hostbyname_cache
102     global hostbyname_cache_lock
103     
104     hostbyname = hostbyname_cache.get(host)
105     if not hostbyname:
106         with hostbyname_cache_lock:
107             hostbyname = resolve_hostname(host)
108             hostbyname_cache[host] = hostbyname
109
110             msg = " Added hostbyname {} - {} ".format(host, hostbyname)
111             log(msg, logging.DEBUG)
112
113     return hostbyname
114
115 OPENSSH_HAS_PERSIST = None
116
117 def openssh_has_persist():
118     """ The ssh_config options ControlMaster and ControlPersist allow to
119     reuse a same network connection for multiple ssh sessions. In this 
120     way limitations on number of open ssh connections can be bypassed.
121     However, older versions of openSSH do not support this feature.
122     This function is used to determine if ssh connection persist features
123     can be used.
124     """
125     global OPENSSH_HAS_PERSIST
126     if OPENSSH_HAS_PERSIST is None:
127         extras = {} if PY2 else {'universal_newlines' : True}
128         with open("/dev/null") as null:
129             proc = subprocess.Popen(
130                 ["ssh", "-v"],
131                 stdout = subprocess.PIPE,
132                 stderr = subprocess.STDOUT,
133                 stdin = null,
134                 **extras
135             )
136             out, err = proc.communicate()
137             proc.wait()
138         
139         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
140         OPENSSH_HAS_PERSIST = bool(vre.match(out))
141     return OPENSSH_HAS_PERSIST
142
143 def make_server_key_args(server_key, host, port):
144     """ Returns a reference to a temporary known_hosts file, to which 
145     the server key has been added. 
146     
147     Make sure to hold onto the temp file reference until the process is 
148     done with it
149
150     :param server_key: the server public key
151     :type server_key: str
152
153     :param host: the hostname
154     :type host: str
155
156     :param port: the ssh port
157     :type port: str
158
159     """
160     if port is not None:
161         host = '{}:{}'.format(host, str(port))
162
163     # Create a temporary server key file
164     tmp_known_hosts = tempfile.NamedTemporaryFile()
165    
166     hostbyname = gethostbyname(host) 
167
168     # Add the intended host key
169     tmp_known_hosts.write('{},{} {}\n'.format(host, hostbyname, server_key))
170     
171     # If we're not in strict mode, add user-configured keys
172     if os.environ.get('NEPI_STRICT_AUTH_MODE', "").lower() not in ('1', 'true', 'on'):
173         user_hosts_path = '{}/.ssh/known_hosts'.format(os.environ.get('HOME', ""))
174         if os.access(user_hosts_path, os.R_OK):
175             with open(user_hosts_path, "r") as f:
176                 tmp_known_hosts.write(f.read())
177         
178     tmp_known_hosts.flush()
179     
180     return tmp_known_hosts
181
182 def make_control_path(agent, forward_x11):
183     ctrl_path = "/tmp/nepi_ssh"
184
185     if agent:
186         ctrl_path += "_a"
187
188     if forward_x11:
189         ctrl_path += "_x"
190
191     ctrl_path += "-%r@%h:%p"
192
193     return ctrl_path
194
195 def shell_escape(s):
196     """ Escapes strings so that they are safe to use as command-line 
197     arguments """
198     if SHELL_SAFE.match(s):
199         # safe string - no escaping needed
200         return s
201     else:
202         # unsafe string - escape
203         def escape(c):
204             if (32 <= ord(c) < 127 or c in ('\r', '\n', '\t')) and c not in ("'", '"'):
205                 return c
206             else:
207                 return "'$'\\x{:02x}''".format(ord(c))
208         s = ''.join(map(escape, s))
209         return "'{}'".format(s)
210
211 def eintr_retry(func):
212     """Retries a function invocation when a EINTR occurs"""
213     import functools
214     @functools.wraps(func)
215     def rv(*p, **kw):
216         retry = kw.pop("_retry", False)
217         for i in range(0 if retry else 4):
218             try:
219                 return func(*p, **kw)
220             except (select.error, socket.error) as args:
221                 if args[0] == errno.EINTR:
222                     continue
223                 else:
224                     raise 
225             except OSError as e:
226                 if e.errno == errno.EINTR:
227                     continue
228                 else:
229                     raise
230         else:
231             return func(*p, **kw)
232     return rv
233
234 def rexec(command, host, user, 
235           port = None,
236           gwuser = None,
237           gw = None, 
238           agent = True,
239           sudo = False,
240           identity = None,
241           server_key = None,
242           env = None,
243           tty = False,
244           connect_timeout = 30,
245           retry = 3,
246           persistent = True,
247           forward_x11 = False,
248           blocking = True,
249           strict_host_checking = True):
250     """
251     Executes a remote command, returns ((stdout, stderr), process)
252     """
253
254     tmp_known_hosts = None
255     if not gw:
256         hostip = gethostbyname(host)
257     else: hostip = None
258
259     args = ['ssh', '-C',
260             # Don't bother with localhost. Makes test easier
261             '-o', 'NoHostAuthenticationForLocalhost=yes',
262             '-o', 'ConnectTimeout={}'.format(connect_timeout),
263             '-o', 'ConnectionAttempts=3',
264             '-o', 'ServerAliveInterval=30',
265             '-o', 'TCPKeepAlive=yes',
266             '-o', 'Batchmode=yes',
267             '-l', user, hostip or host]
268
269     if persistent and openssh_has_persist():
270         args.extend([
271             '-o', 'ControlMaster=auto',
272             '-o', 'ControlPath={}'.format(make_control_path(agent, forward_x11)),
273             '-o', 'ControlPersist=60' ])
274
275     if not strict_host_checking:
276         # Do not check for Host key. Unsafe.
277         args.extend(['-o', 'StrictHostKeyChecking=no'])
278
279     if gw:
280         proxycommand = _proxy_command(gw, gwuser, identity)
281         args.extend(['-o', proxycommand])
282
283     if agent:
284         args.append('-A')
285
286     if port:
287         args.append('-p{}'.format(port))
288
289     if identity:
290         identity = os.path.expanduser(identity)
291         args.extend(('-i', identity))
292
293     if tty:
294         args.append('-t')
295         args.append('-t')
296
297     if forward_x11:
298         args.append('-X')
299
300     if server_key:
301         # Create a temporary server key file
302         tmp_known_hosts = make_server_key_args(server_key, host, port)
303         args.extend(['-o', 'UserKnownHostsFile={}'.format(tmp_known_hosts.name)])
304
305     if sudo:
306         command = "sudo " + command
307
308     args.append(command)
309
310     log_msg = " rexec - host {} - command {} ".format(host, " ".join(map(str, args)))
311
312     stdout = stderr = stdin = subprocess.PIPE
313     if forward_x11:
314         stdout = stderr = stdin = None
315
316     return _retry_rexec(args, log_msg, 
317                         stderr = stderr,
318                         stdin = stdin,
319                         stdout = stdout,
320                         env = env, 
321                         retry = retry, 
322                         tmp_known_hosts = tmp_known_hosts,
323                         blocking = blocking)
324
325 def rcopy(source, dest,
326           port = None,
327           gwuser = None,
328           gw = None,
329           recursive = False,
330           identity = None,
331           server_key = None,
332           retry = 3,
333           strict_host_checking = True):
334     """
335     Copies from/to remote sites.
336     
337     Source and destination should have the user and host encoded
338     as per scp specs.
339     
340     Source can be a list of files to copy to a single destination, 
341     (in which case it is advised that the destination be a folder),
342     or a single file in a string.
343     """
344
345     # Parse destination as <user>@<server>:<path>
346     if isinstance(dest, str) and ':' in dest:
347         remspec, path = dest.split(':', 1)
348     elif isinstance(source, str) and ':' in source:
349         remspec, path = source.split(':', 1)
350     else:
351         raise ValueError("Both endpoints cannot be local")
352     user, host = remspec.rsplit('@', 1)
353     
354     # plain scp
355     tmp_known_hosts = None
356
357     args = ['scp', '-q', '-p', '-C',
358             # 2015-06-01 Thierry: I am commenting off blowfish
359             # as this is not available on a plain ubuntu 15.04 install
360             # this IMHO is too fragile, shoud be something the user
361             # decides explicitly (so he is at least aware of that dependency)
362             # Speed up transfer using blowfish cypher specification which is 
363             # faster than the default one (3des)
364             # '-c', 'blowfish',
365             # Don't bother with localhost. Makes test easier
366             '-o', 'NoHostAuthenticationForLocalhost=yes',
367             '-o', 'ConnectTimeout=60',
368             '-o', 'ConnectionAttempts=3',
369             '-o', 'ServerAliveInterval=30',
370             '-o', 'TCPKeepAlive=yes' ]
371             
372     if port:
373         args.append('-P{}'.format(port))
374
375     if gw:
376         proxycommand = _proxy_command(gw, gwuser, identity)
377         args.extend(['-o', proxycommand])
378
379     if recursive:
380         args.append('-r')
381
382     if identity:
383         identity = os.path.expanduser(identity)
384         args.extend(('-i', identity))
385
386     if server_key:
387         # Create a temporary server key file
388         tmp_known_hosts = make_server_key_args(server_key, host, port)
389         args.extend(['-o', 'UserKnownHostsFile={}'.format(tmp_known_hosts.name)])
390
391     if not strict_host_checking:
392         # Do not check for Host key. Unsafe.
393         args.extend(['-o', 'StrictHostKeyChecking=no'])
394     
395     if isinstance(source, list):
396         args.extend(source)
397     else:
398         if openssh_has_persist():
399             args.extend([
400                 '-o', 'ControlMaster=auto',
401                 '-o', 'ControlPath={}'.format(make_control_path(False, False))
402                 ])
403         args.append(source)
404
405     if isinstance(dest, list):
406         args.extend(dest)
407     else:
408         args.append(dest)
409
410     log_msg = " rcopy - host {} - command {} ".format(host, " ".join(map(str, args)))
411     
412     return _retry_rexec(args, log_msg, env = None, retry = retry, 
413                         tmp_known_hosts = tmp_known_hosts,
414                         blocking = True)
415
416 def rspawn(command, pidfile, 
417            stdout = '/dev/null', 
418            stderr = STDOUT, 
419            stdin = '/dev/null',
420            home = None, 
421            create_home = False, 
422            sudo = False,
423            host = None, 
424            port = None, 
425            user = None, 
426            gwuser = None,
427            gw = None,
428            agent = None, 
429            identity = None, 
430            server_key = None,
431            tty = False,
432            strict_host_checking = True):
433     """
434     Spawn a remote command such that it will continue working asynchronously in 
435     background. 
436
437         :param command: The command to run, it should be a single line.
438         :type command: str
439
440         :param pidfile: Path to a file where to store the pid and ppid of the 
441                         spawned process
442         :type pidfile: str
443
444         :param stdout: Path to file to redirect standard output. 
445                        The default value is /dev/null
446         :type stdout: str
447
448         :param stderr: Path to file to redirect standard error.
449                        If the special STDOUT value is used, stderr will 
450                        be redirected to the same file as stdout
451         :type stderr: str
452
453         :param stdin: Path to a file with input to be piped into the command's standard input
454         :type stdin: str
455
456         :param home: Path to working directory folder. 
457                     It is assumed to exist unless the create_home flag is set.
458         :type home: str
459
460         :param create_home: Flag to force creation of the home folder before 
461                             running the command
462         :type create_home: bool
463  
464         :param sudo: Flag forcing execution with sudo user
465         :type sudo: bool
466         
467         :rtype: tuple
468
469         (stdout, stderr), process
470         
471         Of the spawning process, which only captures errors at spawning time.
472         Usually only useful for diagnostics.
473     """
474     # Start process in a "daemonized" way, using nohup and heavy
475     # stdin/out redirection to avoid connection issues
476     if stderr is STDOUT:
477         stderr = '&1'
478     else:
479         stderr = ' ' + stderr
480     
481     daemon_command = '{{ {{ {command} > {stdout} 2>{stderr} < {stdin} & }} ; echo $! 1 > {pidfile} ; }}'\
482                      .format(command = command,
483                              pidfile = shell_escape(pidfile),
484                              stdout = stdout,
485                              stderr = stderr,
486                              stdin = stdin)
487     
488     cmd = "{create}{gohome} rm -f {pidfile} ; {sudo} nohup bash -c {command} "\
489           .format(command = shell_escape(daemon_command),
490                   sudo = 'sudo -S' if sudo else '',
491                   pidfile = shell_escape(pidfile),
492                   gohome = 'cd {} ; '.format(shell_escape(home)) if home else '',
493                   create = 'mkdir -p {} ; '.format(shell_escape(home)) if create_home and home else '')
494
495     (out, err), proc = rexec(
496         cmd,
497         host = host,
498         port = port,
499         user = user,
500         gwuser = gwuser,
501         gw = gw,
502         agent = agent,
503         identity = identity,
504         server_key = server_key,
505         tty = tty,
506         strict_host_checking = strict_host_checking ,
507     )
508     
509     if proc.wait():
510         raise RuntimeError("Failed to set up application on host {}: {} {}".format(host, out, err))
511
512     return ((out, err), proc)
513
514 @eintr_retry
515 def rgetpid(pidfile,
516             host = None, 
517             port = None, 
518             user = None, 
519             gwuser = None,
520             gw = None,
521             agent = None, 
522             identity = None,
523             server_key = None,
524             strict_host_checking = True):
525     """
526     Returns the pid and ppid of a process from a remote file where the 
527     information was stored.
528
529         :param home: Path to directory where the pidfile is located
530         :type home: str
531
532         :param pidfile: Name of file containing the pid information
533         :type pidfile: str
534         
535         :rtype: int
536         
537         A (pid, ppid) tuple useful for calling rstatus and rkill,
538         or None if the pidfile isn't valid yet (can happen when process is staring up)
539
540     """
541     (out, err), proc = rexec(
542         "cat {}".format(pidfile),
543         host = host,
544         port = port,
545         user = user,
546         gwuser = gwuser,
547         gw = gw,
548         agent = agent,
549         identity = identity,
550         server_key = server_key,
551         strict_host_checking = strict_host_checking
552     )
553         
554     if proc.wait():
555         return None
556     
557     if out:
558         try:
559             return [ int(x) for x in out.strip().split(' ', 1) ]
560         except:
561             # Ignore, many ways to fail that don't matter that much
562             return None
563
564 @eintr_retry
565 def rstatus(pid, ppid, 
566         host = None, 
567         port = None, 
568         user = None, 
569         gwuser = None,
570         gw = None,
571         agent = None, 
572         identity = None,
573         server_key = None,
574         strict_host_checking = True):
575     """
576     Returns a code representing the the status of a remote process
577
578         :param pid: Process id of the process
579         :type pid: int
580
581         :param ppid: Parent process id of process
582         :type ppid: int
583     
584         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
585     
586     """
587     (out, err), proc = rexec(
588         # Check only by pid. pid+ppid does not always work (especially with sudo) 
589         " (( ps --pid {pid} -o pid | grep -c {pid} && echo 'wait')  || echo 'done' ) | tail -n 1"\
590         .format(**locals()),
591         host = host,
592         port = port,
593         user = user,
594         gwuser = gwuser,
595         gw = gw,
596         agent = agent,
597         identity = identity,
598         server_key = server_key,
599         strict_host_checking = strict_host_checking
600     )
601     
602     if proc.wait():
603         return ProcStatus.NOT_STARTED
604     
605     status = False
606     if err:
607         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
608             status = True
609     elif out:
610         status = (out.strip() == 'wait')
611     else:
612         return ProcStatus.NOT_STARTED
613     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
614
615 @eintr_retry
616 def rkill(pid, ppid,
617         host = None, 
618         port = None, 
619         user = None, 
620         gwuser = None,
621         gw = None,
622         agent = None, 
623         sudo = False,
624         identity = None, 
625         server_key = None, 
626         nowait = False,
627         strict_host_checking = True):
628     """
629     Sends a kill signal to a remote process.
630
631     First tries a SIGTERM, and if the process does not end in 10 seconds,
632     it sends a SIGKILL.
633  
634         :param pid: Process id of process to be killed
635         :type pid: int
636
637         :param ppid: Parent process id of process to be killed
638         :type ppid: int
639
640         :param sudo: Flag indicating if sudo should be used to kill the process
641         :type sudo: bool
642         
643     """
644     subkill = "$(ps --ppid {} -o pid h)".format(pid)
645     cmd_format = """
646 SUBKILL="{subkill}" ;
647 {sudo} kill -- -{pid} $SUBKILL || /bin/true
648 {sudo} kill {pid} $SUBKILL || /bin/true
649 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
650     sleep 0.2 
651     if [ `ps --pid {pid} -o pid | grep -c {pid}` == '0' ]; then
652         break
653     else
654         {sudo} kill -- -{pid} $SUBKILL || /bin/true
655         {sudo} kill {pid} $SUBKILL || /bin/true
656     fi
657     sleep 1.8
658 done
659 if [ `ps --pid {pid} -o pid | grep -c {pid}` != '0' ]; then
660     {sudo} kill -9 -- -{pid} $SUBKILL || /bin/true
661     {sudo} kill -9 {pid} $SUBKILL || /bin/true
662 fi
663 """
664     if nowait:
665         cmd_format = "( {} ) >/dev/null 2>/dev/null </dev/null &".format(cmd_format)
666
667     sudo = 'sudo -S' if sudo else ''
668     (out, err), proc = rexec(
669         cmd_format.format(**locals()),
670         host = host,
671         port = port,
672         user = user,
673         gwuser = gwuser,
674         gw = gw,
675         agent = agent,
676         identity = identity,
677         server_key = server_key,
678         strict_host_checking = strict_host_checking
679         )
680     
681     # wait, don't leave zombies around
682     proc.wait()
683
684     return (out, err), proc
685
686 # add quotes around a shell arg only if it has spaces
687 def pretty_arg(shell_arg):
688     return shell_arg if ' ' not in shell_arg else "'{}'".format(shell_arg)
689 def pretty_args(shell_args):
690     return " ".join([pretty_arg(shell_arg) for shell_arg in shell_args])
691
692 def _retry_rexec(args,
693                  log_msg,
694                  stdout = subprocess.PIPE,
695                  stdin = subprocess.PIPE, 
696                  stderr = subprocess.PIPE,
697                  env = None,
698                  retry = 3,
699                  tmp_known_hosts = None,
700                  blocking = True):
701
702     for x in range(retry):
703         # display command actually invoked when debug is turned on
704         message = pretty_args(args)
705         log("sshfuncs: invoking {}".format(message), logging.DEBUG)
706         extras = {} if PY2 else {'universal_newlines' : True}
707         # connects to the remote host and starts a remote connection
708         proc = subprocess.Popen(
709             args,
710             env = env,
711             stdout = stdout,
712             stdin = stdin, 
713             stderr = stderr,
714             **extras
715         )        
716         # attach tempfile object to the process, to make sure the file stays
717         # alive until the process is finished with it
718         proc._known_hosts = tmp_known_hosts
719     
720         # The argument block == False forces to rexec to return immediately, 
721         # without blocking 
722         try:
723             err = out = " "
724             if blocking:
725                 #(out, err) = proc.communicate()
726                 # The method communicate was re implemented for performance issues
727                 # when using python subprocess communicate method the ssh commands 
728                 # last one minute each
729                 #log("BEFORE communicate", level=logging.INFO); import time; beg=time.time()
730                 out, err = _communicate(proc, input=None)
731                 #log("AFTER communicate - {}s".format(time.time()-beg), level=logging.INFO)
732
733             elif stdout:
734                 out = proc.stdout.read()
735                 if proc.poll() and stderr:
736                     err = proc.stderr.read()
737
738             log(log_msg, logging.DEBUG, out, err)
739
740             if proc.poll():
741                 skip = False
742
743                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
744                     # SSH error, can safely retry
745                     skip = True 
746                 elif retry:
747                     # Probably timed out or plain failed but can retry
748                     skip = True 
749                 
750                 if skip:
751                     t = x*2
752                     msg = "SLEEPING {} ... ATEMPT {} - command {} "\
753                           .format(t, x, " ".join(args))
754                     log(msg, logging.DEBUG)
755
756                     time.sleep(t)
757                     continue
758             break
759         except RuntimeError as e:
760             msg = " rexec EXCEPTION - TIMEOUT -> {} \n {}".format(e.args, log_msg)
761             log(msg, logging.DEBUG, out, err)
762
763             if retry <= 0:
764                 raise
765             retry -= 1
766
767     return ((out, err), proc)
768
769 # POSIX
770 # Don't remove. The method communicate was re implemented for performance issues
771 def _communicate(proc, input, timeout=None, err_on_timeout=True):
772     read_set = []
773     write_set = []
774     stdout = None # Return
775     stderr = None # Return
776
777     killed = False
778
779     if timeout is not None:
780         timelimit = time.time() + timeout
781         killtime = timelimit + 4
782         bailtime = timelimit + 4
783
784     if proc.stdin:
785         # Flush stdio buffer.  This might block, if the user has
786         # been writing to .stdin in an uncontrolled fashion.
787         proc.stdin.flush()
788         if input:
789             write_set.append(proc.stdin)
790         else:
791             proc.stdin.close()
792
793     if proc.stdout:
794         read_set.append(proc.stdout)
795         stdout = []
796
797     if proc.stderr:
798         read_set.append(proc.stderr)
799         stderr = []
800
801     input_offset = 0
802     while read_set or write_set:
803         if timeout is not None:
804             curtime = time.time()
805             if timeout is None or curtime > timelimit:
806                 if curtime > bailtime:
807                     break
808                 elif curtime > killtime:
809                     signum = signal.SIGKILL
810                 else:
811                     signum = signal.SIGTERM
812                 # Lets kill it
813                 os.kill(proc.pid, signum)
814                 select_timeout = 0.5
815             else:
816                 select_timeout = timelimit - curtime + 0.1
817         else:
818             select_timeout = 1.0
819
820         if select_timeout > 1.0:
821             select_timeout = 1.0
822
823         try:
824             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
825         except select.error as e:
826             if e[0] != 4:
827                 raise
828             else:
829                 continue
830
831         if not rlist and not wlist and not xlist and proc.poll() is not None:
832             # timeout and process exited, say bye
833             break
834
835         if proc.stdin in wlist:
836             # When select has indicated that the file is writable,
837             # we can write up to PIPE_BUF bytes without risk
838             # blocking.  POSIX defines PIPE_BUF >= 512
839             bytes_written = os.write(proc.stdin.fileno(),
840                                      buffer(input, input_offset, 512))
841             input_offset += bytes_written
842
843             if input_offset >= len(input):
844                 proc.stdin.close()
845                 write_set.remove(proc.stdin)
846
847         # xxx possible distortion when upgrading to python3
848         # original py2 version used to do
849         # data = os.read(proc.stdout.fileno(), 1024)
850         # but this would return bytes, so..
851         if proc.stdout in rlist:
852             data = proc.stdout.read()
853             if not data:
854                 proc.stdout.close()
855                 read_set.remove(proc.stdout)
856             stdout.append(data)
857
858         # likewise
859         if proc.stderr in rlist:
860             data = proc.stderr.read()
861             if not data:
862                 proc.stderr.close()
863                 read_set.remove(proc.stderr)
864             stderr.append(data)
865
866     # All data exchanged.  Translate lists into strings.
867     if stdout is not None:
868         stdout = ''.join(stdout)
869     if stderr is not None:
870         stderr = ''.join(stderr)
871
872     # Translate newlines, if requested.  We cannot let the file
873     # object do the translation: It is based on stdio, which is
874     # impossible to combine with select (unless forcing no
875     # buffering).
876     # this however seems to make no sense in the context of python3
877     if PY2:
878         if proc.universal_newlines and hasattr(file, 'newlines'):
879             if stdout:
880                 stdout = proc._translate_newlines(stdout)
881             if stderr:
882                 stderr = proc._translate_newlines(stderr)
883
884     if killed and err_on_timeout:
885         errcode = proc.poll()
886         raise RuntimeError("Operation timed out", errcode, stdout, stderr)
887     else:
888         if killed:
889             proc.poll()
890         else:
891             proc.wait()
892         return (stdout, stderr)
893
894 def _proxy_command(gw, gwuser, gwidentity):
895     """
896     Constructs the SSH ProxyCommand option to add to the SSH command to connect
897     via a proxy
898         :param gw: SSH proxy hostname
899         :type gw:  str 
900        
901         :param gwuser: SSH proxy username
902         :type gwuser:  str
903
904         :param gwidentity: SSH proxy identity file 
905         :type gwidentity:  str
906
907   
908         :rtype: str 
909         
910         returns the SSH ProxyCommand option.
911     """
912
913     proxycommand = 'ProxyCommand=ssh -q '
914     if gwidentity:
915         proxycommand += '-i {} '.format(os.path.expanduser(gwidentity))
916     if gwuser:
917         proxycommand += '{}'.format(gwuser)
918     else:
919         proxycommand += '%r'
920     proxycommand += '@{} -W %h:%p'.format(gw)
921
922     return proxycommand
923