fixed 2 bugs/typos found through testing
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 #         Claudio Freire <claudio-daniel.freire@inria.fr>
19
20 ## TODO: This code needs reviewing !!!
21
22 import base64
23 import errno
24 import hashlib
25 import logging
26 import os
27 import os.path
28 import re
29 import select
30 import signal
31 import socket
32 import subprocess
33 import threading
34 import time
35 import tempfile
36
37 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") 
38
39 logger = logging.getLogger("sshfuncs")
40
41 def log(msg, level = logging.DEBUG, out = None, err = None):
42     if out:
43         msg += " - OUT: %s " % out
44     if err:
45         msg += " - ERROR: %s " % err
46     logger.log(level, msg)
47
48 if hasattr(os, "devnull"):
49     DEV_NULL = os.devnull
50 else:
51     DEV_NULL = "/dev/null"
52
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
54
55 class STDOUT: 
56     """
57     Special value that when given to rspawn in stderr causes stderr to 
58     redirect to whatever stdout was redirected to.
59     """
60     pass
61
62 class ProcStatus:
63     """
64     Codes for status of remote spawned process
65     """
66     # Process is still running
67     RUNNING = 1
68
69     # Process is finished
70     FINISHED = 2
71     
72     # Process hasn't started running yet (this should be very rare)
73     NOT_STARTED = 3
74
75 hostbyname_cache = dict()
76 hostbyname_cache_lock = threading.Lock()
77
78 def resolve_hostname(host):
79     ip = None
80
81     if host in ["localhost", "127.0.0.1", "::1"]:
82         p = subprocess.Popen(
83             "ip -o addr list",
84             shell=True,
85             stdout=subprocess.PIPE,
86             stderr=subprocess.PIPE,
87         )
88         stdout, stderr = p.communicate()
89         m = _re_inet.findall(stdout)
90         ip = m[0][1].split("/")[0]
91     else:
92         ip = socket.gethostbyname(host)
93
94     return ip
95
96 def gethostbyname(host):
97     global hostbyname_cache
98     global hostbyname_cache_lock
99     
100     hostbyname = hostbyname_cache.get(host)
101     if not hostbyname:
102         with hostbyname_cache_lock:
103             hostbyname = resolve_hostname(host)
104             hostbyname_cache[host] = hostbyname
105
106             msg = " Added hostbyname %s - %s " % (host, hostbyname)
107             log(msg, logging.DEBUG)
108
109     return hostbyname
110
111 OPENSSH_HAS_PERSIST = None
112
113 def openssh_has_persist():
114     """ The ssh_config options ControlMaster and ControlPersist allow to
115     reuse a same network connection for multiple ssh sessions. In this 
116     way limitations on number of open ssh connections can be bypassed.
117     However, older versions of openSSH do not support this feature.
118     This function is used to determine if ssh connection persist features
119     can be used.
120     """
121     global OPENSSH_HAS_PERSIST
122     if OPENSSH_HAS_PERSIST is None:
123         with open("/dev/null") as null:
124             proc = subprocess.Popen(
125                 ["ssh", "-v"],
126                 stdout = subprocess.PIPE,
127                 stderr = subprocess.STDOUT,
128                 stdin = null,
129             )
130             out,err = proc.communicate()
131             proc.wait()
132         
133         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
134         OPENSSH_HAS_PERSIST = bool(vre.match(out))
135     return OPENSSH_HAS_PERSIST
136
137 def make_server_key_args(server_key, host, port):
138     """ Returns a reference to a temporary known_hosts file, to which 
139     the server key has been added. 
140     
141     Make sure to hold onto the temp file reference until the process is 
142     done with it
143
144     :param server_key: the server public key
145     :type server_key: str
146
147     :param host: the hostname
148     :type host: str
149
150     :param port: the ssh port
151     :type port: str
152
153     """
154     if port is not None:
155         host = '%s:%s' % (host, str(port))
156
157     # Create a temporary server key file
158     tmp_known_hosts = tempfile.NamedTemporaryFile()
159    
160     hostbyname = gethostbyname(host) 
161
162     # Add the intended host key
163     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
164     
165     # If we're not in strict mode, add user-configured keys
166     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
167         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
168         if os.access(user_hosts_path, os.R_OK):
169             with open(user_hosts_path, "r") as f:
170                 tmp_known_hosts.write(f.read())
171         
172     tmp_known_hosts.flush()
173     
174     return tmp_known_hosts
175
176 def make_control_path(agent, forward_x11):
177     ctrl_path = "/tmp/nepi_ssh"
178
179     if agent:
180         ctrl_path +="_a"
181
182     if forward_x11:
183         ctrl_path +="_x"
184
185     ctrl_path += "-%r@%h:%p"
186
187     return ctrl_path
188
189 def shell_escape(s):
190     """ Escapes strings so that they are safe to use as command-line 
191     arguments """
192     if SHELL_SAFE.match(s):
193         # safe string - no escaping needed
194         return s
195     else:
196         # unsafe string - escape
197         def escape(c):
198             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
199                 return c
200             else:
201                 return "'$'\\x%02x''" % (ord(c),)
202         s = ''.join(map(escape, s))
203         return "'%s'" % (s,)
204
205 def eintr_retry(func):
206     """Retries a function invocation when a EINTR occurs"""
207     import functools
208     @functools.wraps(func)
209     def rv(*p, **kw):
210         retry = kw.pop("_retry", False)
211         for i in range(0 if retry else 4):
212             try:
213                 return func(*p, **kw)
214             except (select.error, socket.error) as args:
215                 if args[0] == errno.EINTR:
216                     continue
217                 else:
218                     raise 
219             except OSError as e:
220                 if e.errno == errno.EINTR:
221                     continue
222                 else:
223                     raise
224         else:
225             return func(*p, **kw)
226     return rv
227
228 def rexec(command, host, user, 
229         port = None,
230         gwuser = None,
231         gw = None, 
232         agent = True,
233         sudo = False,
234         identity = None,
235         server_key = None,
236         env = None,
237         tty = False,
238         connect_timeout = 30,
239         retry = 3,
240         persistent = True,
241         forward_x11 = False,
242         blocking = True,
243         strict_host_checking = True):
244     """
245     Executes a remote command, returns ((stdout,stderr),process)
246     """
247
248     tmp_known_hosts = None
249     if not gw:
250         hostip = gethostbyname(host)
251     else: hostip = None
252
253     args = ['ssh', '-C',
254             # Don't bother with localhost. Makes test easier
255             '-o', 'NoHostAuthenticationForLocalhost=yes',
256             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
257             '-o', 'ConnectionAttempts=3',
258             '-o', 'ServerAliveInterval=30',
259             '-o', 'TCPKeepAlive=yes',
260             '-o', 'Batchmode=yes',
261             '-l', user, hostip or host]
262
263     if persistent and openssh_has_persist():
264         args.extend([
265             '-o', 'ControlMaster=auto',
266             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
267             '-o', 'ControlPersist=60' ])
268
269     if not strict_host_checking:
270         # Do not check for Host key. Unsafe.
271         args.extend(['-o', 'StrictHostKeyChecking=no'])
272
273     if gw:
274         proxycommand = _proxy_command(gw, gwuser, identity)
275         args.extend(['-o', proxycommand])
276
277     if agent:
278         args.append('-A')
279
280     if port:
281         args.append('-p%d' % port)
282
283     if identity:
284         identity = os.path.expanduser(identity)
285         args.extend(('-i', identity))
286
287     if tty:
288         args.append('-t')
289         args.append('-t')
290
291     if forward_x11:
292         args.append('-X')
293
294     if server_key:
295         # Create a temporary server key file
296         tmp_known_hosts = make_server_key_args(server_key, host, port)
297         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
298
299     if sudo:
300         command = "sudo " + command
301
302     args.append(command)
303
304     log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) 
305
306     stdout = stderr = stdin = subprocess.PIPE
307     if forward_x11:
308         stdout = stderr = stdin = None
309
310     return _retry_rexec(args, log_msg, 
311                         stderr = stderr,
312                         stdin = stdin,
313                         stdout = stdout,
314                         env = env, 
315                         retry = retry, 
316                         tmp_known_hosts = tmp_known_hosts,
317                         blocking = blocking)
318
319 def rcopy(source, dest,
320         port = None,
321         gwuser = None,
322         gw = None,
323         recursive = False,
324         identity = None,
325         server_key = None,
326         retry = 3,
327         strict_host_checking = True):
328     """
329     Copies from/to remote sites.
330     
331     Source and destination should have the user and host encoded
332     as per scp specs.
333     
334     Source can be a list of files to copy to a single destination, 
335     (in which case it is advised that the destination be a folder),
336     or a single file in a string.
337     """
338
339     # Parse destination as <user>@<server>:<path>
340     if isinstance(dest, str) and ':' in dest:
341         remspec, path = dest.split(':',1)
342     elif isinstance(source, str) and ':' in source:
343         remspec, path = source.split(':',1)
344     else:
345         raise ValueError("Both endpoints cannot be local")
346     user,host = remspec.rsplit('@',1)
347     
348     # plain scp
349     tmp_known_hosts = None
350
351     args = ['scp', '-q', '-p', '-C',
352             # 2015-06-01 Thierry: I am commenting off blowfish
353             # as this is not available on a plain ubuntu 15.04 install
354             # this IMHO is too fragile, shoud be something the user
355             # decides explicitly (so he is at least aware of that dependency)
356             # Speed up transfer using blowfish cypher specification which is 
357             # faster than the default one (3des)
358             # '-c', 'blowfish',
359             # Don't bother with localhost. Makes test easier
360             '-o', 'NoHostAuthenticationForLocalhost=yes',
361             '-o', 'ConnectTimeout=60',
362             '-o', 'ConnectionAttempts=3',
363             '-o', 'ServerAliveInterval=30',
364             '-o', 'TCPKeepAlive=yes' ]
365             
366     if port:
367         args.append('-P%d' % port)
368
369     if gw:
370         proxycommand = _proxy_command(gw, gwuser, identity)
371         args.extend(['-o', proxycommand])
372
373     if recursive:
374         args.append('-r')
375
376     if identity:
377         identity = os.path.expanduser(identity)
378         args.extend(('-i', identity))
379
380     if server_key:
381         # Create a temporary server key file
382         tmp_known_hosts = make_server_key_args(server_key, host, port)
383         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
384
385     if not strict_host_checking:
386         # Do not check for Host key. Unsafe.
387         args.extend(['-o', 'StrictHostKeyChecking=no'])
388     
389     if isinstance(source, list):
390         args.extend(source)
391     else:
392         if openssh_has_persist():
393             args.extend([
394                 '-o', 'ControlMaster=auto',
395                 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
396                 ])
397         args.append(source)
398
399     if isinstance(dest, list):
400         args.extend(dest)
401     else:
402         args.append(dest)
403
404     log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
405     
406     return _retry_rexec(args, log_msg, env = None, retry = retry, 
407             tmp_known_hosts = tmp_known_hosts,
408             blocking = True)
409
410 def rspawn(command, pidfile, 
411            stdout = '/dev/null', 
412            stderr = STDOUT, 
413            stdin = '/dev/null',
414            home = None, 
415            create_home = False, 
416            sudo = False,
417            host = None, 
418            port = None, 
419            user = None, 
420            gwuser = None,
421            gw = None,
422            agent = None, 
423            identity = None, 
424            server_key = None,
425            tty = False,
426            strict_host_checking = True):
427     """
428     Spawn a remote command such that it will continue working asynchronously in 
429     background. 
430
431         :param command: The command to run, it should be a single line.
432         :type command: str
433
434         :param pidfile: Path to a file where to store the pid and ppid of the 
435                         spawned process
436         :type pidfile: str
437
438         :param stdout: Path to file to redirect standard output. 
439                        The default value is /dev/null
440         :type stdout: str
441
442         :param stderr: Path to file to redirect standard error.
443                        If the special STDOUT value is used, stderr will 
444                        be redirected to the same file as stdout
445         :type stderr: str
446
447         :param stdin: Path to a file with input to be piped into the command's standard input
448         :type stdin: str
449
450         :param home: Path to working directory folder. 
451                     It is assumed to exist unless the create_home flag is set.
452         :type home: str
453
454         :param create_home: Flag to force creation of the home folder before 
455                             running the command
456         :type create_home: bool
457  
458         :param sudo: Flag forcing execution with sudo user
459         :type sudo: bool
460         
461         :rtype: tuple
462
463         (stdout, stderr), process
464         
465         Of the spawning process, which only captures errors at spawning time.
466         Usually only useful for diagnostics.
467     """
468     # Start process in a "daemonized" way, using nohup and heavy
469     # stdin/out redirection to avoid connection issues
470     if stderr is STDOUT:
471         stderr = '&1'
472     else:
473         stderr = ' ' + stderr
474     
475     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
476         'command' : command,
477         'pidfile' : shell_escape(pidfile),
478         'stdout' : stdout,
479         'stderr' : stderr,
480         'stdin' : stdin,
481     }
482     
483     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
484             'command' : shell_escape(daemon_command),
485             'sudo' : 'sudo -S' if sudo else '',
486             'pidfile' : shell_escape(pidfile),
487             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
488             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
489         }
490
491     (out,err),proc = rexec(
492         cmd,
493         host = host,
494         port = port,
495         user = user,
496         gwuser = gwuser,
497         gw = gw,
498         agent = agent,
499         identity = identity,
500         server_key = server_key,
501         tty = tty,
502         strict_host_checking = strict_host_checking ,
503         )
504     
505     if proc.wait():
506         raise RuntimeError("Failed to set up application on host %s: %s %s" % (host, out,err,))
507
508     return ((out, err), proc)
509
510 @eintr_retry
511 def rgetpid(pidfile,
512             host = None, 
513             port = None, 
514             user = None, 
515             gwuser = None,
516             gw = None,
517             agent = None, 
518             identity = None,
519             server_key = None,
520             strict_host_checking = True):
521     """
522     Returns the pid and ppid of a process from a remote file where the 
523     information was stored.
524
525         :param home: Path to directory where the pidfile is located
526         :type home: str
527
528         :param pidfile: Name of file containing the pid information
529         :type pidfile: str
530         
531         :rtype: int
532         
533         A (pid, ppid) tuple useful for calling rstatus and rkill,
534         or None if the pidfile isn't valid yet (can happen when process is staring up)
535
536     """
537     (out,err),proc = rexec(
538         "cat %(pidfile)s" % {
539             'pidfile' : pidfile,
540         },
541         host = host,
542         port = port,
543         user = user,
544         gwuser = gwuser,
545         gw = gw,
546         agent = agent,
547         identity = identity,
548         server_key = server_key,
549         strict_host_checking = strict_host_checking
550         )
551         
552     if proc.wait():
553         return None
554     
555     if out:
556         try:
557             return [ int(x) for x in out.strip().split(' ', 1) ]
558         except:
559             # Ignore, many ways to fail that don't matter that much
560             return None
561
562 @eintr_retry
563 def rstatus(pid, ppid, 
564         host = None, 
565         port = None, 
566         user = None, 
567         gwuser = None,
568         gw = None,
569         agent = None, 
570         identity = None,
571         server_key = None,
572         strict_host_checking = True):
573     """
574     Returns a code representing the the status of a remote process
575
576         :param pid: Process id of the process
577         :type pid: int
578
579         :param ppid: Parent process id of process
580         :type ppid: int
581     
582         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
583     
584     """
585     (out,err),proc = rexec(
586         # Check only by pid. pid+ppid does not always work (especially with sudo) 
587         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
588             'ppid' : ppid,
589             'pid' : pid,
590         },
591         host = host,
592         port = port,
593         user = user,
594         gwuser = gwuser,
595         gw = gw,
596         agent = agent,
597         identity = identity,
598         server_key = server_key,
599         strict_host_checking = strict_host_checking
600         )
601     
602     if proc.wait():
603         return ProcStatus.NOT_STARTED
604     
605     status = False
606     if err:
607         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
608             status = True
609     elif out:
610         status = (out.strip() == 'wait')
611     else:
612         return ProcStatus.NOT_STARTED
613     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
614
615 @eintr_retry
616 def rkill(pid, ppid,
617         host = None, 
618         port = None, 
619         user = None, 
620         gwuser = None,
621         gw = None,
622         agent = None, 
623         sudo = False,
624         identity = None, 
625         server_key = None, 
626         nowait = False,
627         strict_host_checking = True):
628     """
629     Sends a kill signal to a remote process.
630
631     First tries a SIGTERM, and if the process does not end in 10 seconds,
632     it sends a SIGKILL.
633  
634         :param pid: Process id of process to be killed
635         :type pid: int
636
637         :param ppid: Parent process id of process to be killed
638         :type ppid: int
639
640         :param sudo: Flag indicating if sudo should be used to kill the process
641         :type sudo: bool
642         
643     """
644     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
645     cmd = """
646 SUBKILL="%(subkill)s" ;
647 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
648 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
649 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
650     sleep 0.2 
651     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
652         break
653     else
654         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
655         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
656     fi
657     sleep 1.8
658 done
659 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
660     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
661     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
662 fi
663 """
664     if nowait:
665         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
666
667     (out,err),proc = rexec(
668         cmd % {
669             'ppid' : ppid,
670             'pid' : pid,
671             'sudo' : 'sudo -S' if sudo else '',
672             'subkill' : subkill,
673         },
674         host = host,
675         port = port,
676         user = user,
677         gwuser = gwuser,
678         gw = gw,
679         agent = agent,
680         identity = identity,
681         server_key = server_key,
682         strict_host_checking = strict_host_checking
683         )
684     
685     # wait, don't leave zombies around
686     proc.wait()
687
688     return (out, err), proc
689
690 def _retry_rexec(args,
691                  log_msg,
692                  stdout = subprocess.PIPE,
693                  stdin = subprocess.PIPE, 
694                  stderr = subprocess.PIPE,
695                  env = None,
696                  retry = 3,
697                  tmp_known_hosts = None,
698                  blocking = True):
699
700     for x in range(retry):
701         # display command actually invoked when debug is turned on
702         message = " ".join( [ "'{}'".format(arg) for arg in args ] )
703         log("sshfuncs: invoking {}".format(message), logging.DEBUG)
704         # connects to the remote host and starts a remote connection
705         proc = subprocess.Popen(
706             args,
707             env = env,
708             stdout = stdout,
709             stdin = stdin, 
710             stderr = stderr,
711         )        
712         # attach tempfile object to the process, to make sure the file stays
713         # alive until the process is finished with it
714         proc._known_hosts = tmp_known_hosts
715     
716         # The argument block == False forces to rexec to return immediately, 
717         # without blocking 
718         try:
719             err = out = " "
720             if blocking:
721                 #(out, err) = proc.communicate()
722                 # The method communicate was re implemented for performance issues
723                 # when using python subprocess communicate method the ssh commands 
724                 # last one minute each
725                 #log("BEFORE communicate", level=logging.INFO); import time; beg=time.time()
726                 out, err = _communicate(proc, input=None)
727                 #log("AFTER communicate - {}s".format(time.time()-beg), level=logging.INFO)
728
729             elif stdout:
730                 out = proc.stdout.read()
731                 if proc.poll() and stderr:
732                     err = proc.stderr.read()
733
734             log(log_msg, logging.DEBUG, out, err)
735
736             if proc.poll():
737                 skip = False
738
739                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
740                     # SSH error, can safely retry
741                     skip = True 
742                 elif retry:
743                     # Probably timed out or plain failed but can retry
744                     skip = True 
745                 
746                 if skip:
747                     t = x*2
748                     msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
749                             t, x, " ".join(args))
750                     log(msg, logging.DEBUG)
751
752                     time.sleep(t)
753                     continue
754             break
755         except RuntimeError as e:
756             msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
757             log(msg, logging.DEBUG, out, err)
758
759             if retry <= 0:
760                 raise
761             retry -= 1
762
763     return ((out, err), proc)
764
765 # POSIX
766 # Don't remove. The method communicate was re implemented for performance issues
767 def _communicate(proc, input, timeout=None, err_on_timeout=True):
768     read_set = []
769     write_set = []
770     stdout = None # Return
771     stderr = None # Return
772
773     killed = False
774
775     if timeout is not None:
776         timelimit = time.time() + timeout
777         killtime = timelimit + 4
778         bailtime = timelimit + 4
779
780     if proc.stdin:
781         # Flush stdio buffer.  This might block, if the user has
782         # been writing to .stdin in an uncontrolled fashion.
783         proc.stdin.flush()
784         if input:
785             write_set.append(proc.stdin)
786         else:
787             proc.stdin.close()
788
789     if proc.stdout:
790         read_set.append(proc.stdout)
791         stdout = []
792
793     if proc.stderr:
794         read_set.append(proc.stderr)
795         stderr = []
796
797     input_offset = 0
798     while read_set or write_set:
799         if timeout is not None:
800             curtime = time.time()
801             if timeout is None or curtime > timelimit:
802                 if curtime > bailtime:
803                     break
804                 elif curtime > killtime:
805                     signum = signal.SIGKILL
806                 else:
807                     signum = signal.SIGTERM
808                 # Lets kill it
809                 os.kill(proc.pid, signum)
810                 select_timeout = 0.5
811             else:
812                 select_timeout = timelimit - curtime + 0.1
813         else:
814             select_timeout = 1.0
815
816         if select_timeout > 1.0:
817             select_timeout = 1.0
818
819         try:
820             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
821         except select.error as e:
822             if e[0] != 4:
823                 raise
824             else:
825                 continue
826
827         if not rlist and not wlist and not xlist and proc.poll() is not None:
828             # timeout and process exited, say bye
829             break
830
831         if proc.stdin in wlist:
832             # When select has indicated that the file is writable,
833             # we can write up to PIPE_BUF bytes without risk
834             # blocking.  POSIX defines PIPE_BUF >= 512
835             bytes_written = os.write(proc.stdin.fileno(),
836                     buffer(input, input_offset, 512))
837             input_offset += bytes_written
838
839             if input_offset >= len(input):
840                 proc.stdin.close()
841                 write_set.remove(proc.stdin)
842
843         if proc.stdout in rlist:
844             data = os.read(proc.stdout.fileno(), 1024)
845             if data == "":
846                 proc.stdout.close()
847                 read_set.remove(proc.stdout)
848             stdout.append(data)
849
850         if proc.stderr in rlist:
851             data = os.read(proc.stderr.fileno(), 1024)
852             if data == "":
853                 proc.stderr.close()
854                 read_set.remove(proc.stderr)
855             stderr.append(data)
856
857     # All data exchanged.  Translate lists into strings.
858     if stdout is not None:
859         stdout = ''.join(stdout)
860     if stderr is not None:
861         stderr = ''.join(stderr)
862
863     # Translate newlines, if requested.  We cannot let the file
864     # object do the translation: It is based on stdio, which is
865     # impossible to combine with select (unless forcing no
866     # buffering).
867     if proc.universal_newlines and hasattr(file, 'newlines'):
868         if stdout:
869             stdout = proc._translate_newlines(stdout)
870         if stderr:
871             stderr = proc._translate_newlines(stderr)
872
873     if killed and err_on_timeout:
874         errcode = proc.poll()
875         raise RuntimeError("Operation timed out", errcode, stdout, stderr)
876     else:
877         if killed:
878             proc.poll()
879         else:
880             proc.wait()
881         return (stdout, stderr)
882
883 def _proxy_command(gw, gwuser, gwidentity):
884     """
885     Constructs the SSH ProxyCommand option to add to the SSH command to connect
886     via a proxy
887         :param gw: SSH proxy hostname
888         :type gw:  str 
889        
890         :param gwuser: SSH proxy username
891         :type gwuser:  str
892
893         :param gwidentity: SSH proxy identity file 
894         :type gwidentity:  str
895
896   
897         :rtype: str 
898         
899         returns the SSH ProxyCommand option.
900     """
901
902     proxycommand = 'ProxyCommand=ssh -q '
903     if gwidentity:
904         proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
905     if gwuser:
906         proxycommand += '%s' % gwuser
907     else:
908         proxycommand += '%r'
909     proxycommand += '@%s -W %%h:%%p' % gw
910
911     return proxycommand
912