8c83823e4e6dae8b9123bb29371928c5953cc649
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 ## TODO: This code needs reviewing !!!
22
23 import base64
24 import errno
25 import hashlib
26 import logging
27 import os
28 import os.path
29 import re
30 import select
31 import signal
32 import socket
33 import subprocess
34 import threading
35 import time
36 import tempfile
37
38 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") 
39
40 logger = logging.getLogger("sshfuncs")
41
42 def log(msg, level, out = None, err = None):
43     if out:
44         msg += " - OUT: %s " % out
45
46     if err:
47         msg += " - ERROR: %s " % err
48
49     logger.log(level, msg)
50
51 if hasattr(os, "devnull"):
52     DEV_NULL = os.devnull
53 else:
54     DEV_NULL = "/dev/null"
55
56 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
57
58 class STDOUT: 
59     """
60     Special value that when given to rspawn in stderr causes stderr to 
61     redirect to whatever stdout was redirected to.
62     """
63
64 class ProcStatus:
65     """
66     Codes for status of remote spawned process
67     """
68     # Process is still running
69     RUNNING = 1
70
71     # Process is finished
72     FINISHED = 2
73     
74     # Process hasn't started running yet (this should be very rare)
75     NOT_STARTED = 3
76
77 hostbyname_cache = dict()
78 hostbyname_cache_lock = threading.Lock()
79
80 def resolve_hostname(host):
81     ip = None
82
83     if host in ["localhost", "127.0.0.1", "::1"]:
84         p = subprocess.Popen("ip -o addr list", shell=True,
85                 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
86         stdout, stderr = p.communicate()
87         m = _re_inet.findall(stdout)
88         ip = m[0][1].split("/")[0]
89     else:
90         ip = socket.gethostbyname(host)
91
92     return ip
93
94 def gethostbyname(host):
95     global hostbyname_cache
96     global hostbyname_cache_lock
97     
98     hostbyname = hostbyname_cache.get(host)
99     if not hostbyname:
100         with hostbyname_cache_lock:
101             hostbyname = resolve_hostname(host)
102             hostbyname_cache[host] = hostbyname
103
104             msg = " Added hostbyname %s - %s " % (host, hostbyname)
105             log(msg, logging.DEBUG)
106
107     return hostbyname
108
109 OPENSSH_HAS_PERSIST = None
110
111 def openssh_has_persist():
112     """ The ssh_config options ControlMaster and ControlPersist allow to
113     reuse a same network connection for multiple ssh sessions. In this 
114     way limitations on number of open ssh connections can be bypassed.
115     However, older versions of openSSH do not support this feature.
116     This function is used to determine if ssh connection persist features
117     can be used.
118     """
119     global OPENSSH_HAS_PERSIST
120     if OPENSSH_HAS_PERSIST is None:
121         proc = subprocess.Popen(["ssh","-v"],
122             stdout = subprocess.PIPE,
123             stderr = subprocess.STDOUT,
124             stdin = open("/dev/null","r") )
125         out,err = proc.communicate()
126         proc.wait()
127         
128         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
129         OPENSSH_HAS_PERSIST = bool(vre.match(out))
130     return OPENSSH_HAS_PERSIST
131
132 def make_server_key_args(server_key, host, port):
133     """ Returns a reference to a temporary known_hosts file, to which 
134     the server key has been added. 
135     
136     Make sure to hold onto the temp file reference until the process is 
137     done with it
138
139     :param server_key: the server public key
140     :type server_key: str
141
142     :param host: the hostname
143     :type host: str
144
145     :param port: the ssh port
146     :type port: str
147
148     """
149     if port is not None:
150         host = '%s:%s' % (host, str(port))
151
152     # Create a temporary server key file
153     tmp_known_hosts = tempfile.NamedTemporaryFile()
154    
155     hostbyname = gethostbyname(host) 
156
157     # Add the intended host key
158     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
159     
160     # If we're not in strict mode, add user-configured keys
161     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
162         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
163         if os.access(user_hosts_path, os.R_OK):
164             f = open(user_hosts_path, "r")
165             tmp_known_hosts.write(f.read())
166             f.close()
167         
168     tmp_known_hosts.flush()
169     
170     return tmp_known_hosts
171
172 def make_control_path(agent, forward_x11):
173     ctrl_path = "/tmp/nepi_ssh"
174
175     if agent:
176         ctrl_path +="_a"
177
178     if forward_x11:
179         ctrl_path +="_x"
180
181     ctrl_path += "-%r@%h:%p"
182
183     return ctrl_path
184
185 def shell_escape(s):
186     """ Escapes strings so that they are safe to use as command-line 
187     arguments """
188     if SHELL_SAFE.match(s):
189         # safe string - no escaping needed
190         return s
191     else:
192         # unsafe string - escape
193         def escp(c):
194             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
195                 return c
196             else:
197                 return "'$'\\x%02x''" % (ord(c),)
198         s = ''.join(map(escp,s))
199         return "'%s'" % (s,)
200
201 def eintr_retry(func):
202     """Retries a function invocation when a EINTR occurs"""
203     import functools
204     @functools.wraps(func)
205     def rv(*p, **kw):
206         retry = kw.pop("_retry", False)
207         for i in xrange(0 if retry else 4):
208             try:
209                 return func(*p, **kw)
210             except (select.error, socket.error), args:
211                 if args[0] == errno.EINTR:
212                     continue
213                 else:
214                     raise 
215             except OSError, e:
216                 if e.errno == errno.EINTR:
217                     continue
218                 else:
219                     raise
220         else:
221             return func(*p, **kw)
222     return rv
223
224 def rexec(command, host, user, 
225         port = None,
226         gwuser = None,
227         gw = None, 
228         agent = True,
229         sudo = False,
230         identity = None,
231         server_key = None,
232         env = None,
233         tty = False,
234         connect_timeout = 30,
235         retry = 3,
236         persistent = True,
237         forward_x11 = False,
238         blocking = True,
239         strict_host_checking = True):
240     """
241     Executes a remote command, returns ((stdout,stderr),process)
242     """
243
244     tmp_known_hosts = None
245     if not gw:
246         hostip = gethostbyname(host)
247     else: hostip = None
248
249     args = ['ssh', '-C',
250             # Don't bother with localhost. Makes test easier
251             '-o', 'NoHostAuthenticationForLocalhost=yes',
252             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
253             '-o', 'ConnectionAttempts=3',
254             '-o', 'ServerAliveInterval=30',
255             '-o', 'TCPKeepAlive=yes',
256             '-o', 'Batchmode=yes',
257             '-l', user, hostip or host]
258
259     if persistent and openssh_has_persist():
260         args.extend([
261             '-o', 'ControlMaster=auto',
262             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
263             '-o', 'ControlPersist=60' ])
264
265     if not strict_host_checking:
266         # Do not check for Host key. Unsafe.
267         args.extend(['-o', 'StrictHostKeyChecking=no'])
268
269     if gw:
270         proxycommand = _proxy_command(gw, gwuser, identity)
271         args.extend(['-o', proxycommand])
272
273     if agent:
274         args.append('-A')
275
276     if port:
277         args.append('-p%d' % port)
278
279     if identity:
280         identity = os.path.expanduser(identity)
281         args.extend(('-i', identity))
282
283     if tty:
284         args.append('-t')
285         args.append('-t')
286
287     if forward_x11:
288         args.append('-X')
289
290     if server_key:
291         # Create a temporary server key file
292         tmp_known_hosts = make_server_key_args(server_key, host, port)
293         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
294
295     if sudo:
296         command = "sudo " + command
297
298     args.append(command)
299
300     log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) 
301
302     stdout = stderr = stdin = subprocess.PIPE
303     if forward_x11:
304         stdout = stderr = stdin = None
305
306     return _retry_rexec(args, log_msg, 
307             stderr = stderr,
308             stdin = stdin,
309             stdout = stdout,
310             env = env, 
311             retry = retry, 
312             tmp_known_hosts = tmp_known_hosts,
313             blocking = blocking)
314
315 def rcopy(source, dest,
316         port = None,
317         gwuser = None,
318         gw = None,
319         recursive = False,
320         identity = None,
321         server_key = None,
322         retry = 3,
323         strict_host_checking = True):
324     """
325     Copies from/to remote sites.
326     
327     Source and destination should have the user and host encoded
328     as per scp specs.
329     
330     Source can be a list of files to copy to a single destination, 
331     (in which case it is advised that the destination be a folder),
332     or a single file in a string.
333     """
334
335     # Parse destination as <user>@<server>:<path>
336     if isinstance(dest, str) and ':' in dest:
337         remspec, path = dest.split(':',1)
338     elif isinstance(source, str) and ':' in source:
339         remspec, path = source.split(':',1)
340     else:
341         raise ValueError, "Both endpoints cannot be local"
342     user,host = remspec.rsplit('@',1)
343     
344     # plain scp
345     tmp_known_hosts = None
346
347     args = ['scp', '-q', '-p', '-C',
348             # Speed up transfer using blowfish cypher specification which is 
349             # faster than the default one (3des)
350             '-c', 'blowfish',
351             # Don't bother with localhost. Makes test easier
352             '-o', 'NoHostAuthenticationForLocalhost=yes',
353             '-o', 'ConnectTimeout=60',
354             '-o', 'ConnectionAttempts=3',
355             '-o', 'ServerAliveInterval=30',
356             '-o', 'TCPKeepAlive=yes' ]
357             
358     if port:
359         args.append('-P%d' % port)
360
361     if gw:
362         proxycommand = _proxy_command(gw, gwuser, identity)
363         args.extend(['-o', proxycommand])
364
365     if recursive:
366         args.append('-r')
367
368     if identity:
369         identity = os.path.expanduser(identity)
370         args.extend(('-i', identity))
371
372     if server_key:
373         # Create a temporary server key file
374         tmp_known_hosts = make_server_key_args(server_key, host, port)
375         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
376
377     if not strict_host_checking:
378         # Do not check for Host key. Unsafe.
379         args.extend(['-o', 'StrictHostKeyChecking=no'])
380     
381     if isinstance(source, list):
382         args.extend(source)
383     else:
384         if openssh_has_persist():
385             args.extend([
386                 '-o', 'ControlMaster=auto',
387                 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
388                 ])
389         args.append(source)
390
391     if isinstance(dest, list):
392         args.extend(dest)
393     else:
394         args.append(dest)
395
396     log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
397     
398     return _retry_rexec(args, log_msg, env = None, retry = retry, 
399             tmp_known_hosts = tmp_known_hosts,
400             blocking = True)
401
402 def rspawn(command, pidfile, 
403         stdout = '/dev/null', 
404         stderr = STDOUT, 
405         stdin = '/dev/null',
406         home = None, 
407         create_home = False, 
408         sudo = False,
409         host = None, 
410         port = None, 
411         user = None, 
412         gwuser = None,
413         gw = None,
414         agent = None, 
415         identity = None, 
416         server_key = None,
417         tty = False,
418         strict_host_checking = True):
419     """
420     Spawn a remote command such that it will continue working asynchronously in 
421     background. 
422
423         :param command: The command to run, it should be a single line.
424         :type command: str
425
426         :param pidfile: Path to a file where to store the pid and ppid of the 
427                         spawned process
428         :type pidfile: str
429
430         :param stdout: Path to file to redirect standard output. 
431                        The default value is /dev/null
432         :type stdout: str
433
434         :param stderr: Path to file to redirect standard error.
435                        If the special STDOUT value is used, stderr will 
436                        be redirected to the same file as stdout
437         :type stderr: str
438
439         :param stdin: Path to a file with input to be piped into the command's standard input
440         :type stdin: str
441
442         :param home: Path to working directory folder. 
443                     It is assumed to exist unless the create_home flag is set.
444         :type home: str
445
446         :param create_home: Flag to force creation of the home folder before 
447                             running the command
448         :type create_home: bool
449  
450         :param sudo: Flag forcing execution with sudo user
451         :type sudo: bool
452         
453         :rtype: tuple
454
455         (stdout, stderr), process
456         
457         Of the spawning process, which only captures errors at spawning time.
458         Usually only useful for diagnostics.
459     """
460     # Start process in a "daemonized" way, using nohup and heavy
461     # stdin/out redirection to avoid connection issues
462     if stderr is STDOUT:
463         stderr = '&1'
464     else:
465         stderr = ' ' + stderr
466     
467     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
468         'command' : command,
469         'pidfile' : shell_escape(pidfile),
470         'stdout' : stdout,
471         'stderr' : stderr,
472         'stdin' : stdin,
473     }
474     
475     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
476             'command' : shell_escape(daemon_command),
477             'sudo' : 'sudo -S' if sudo else '',
478             'pidfile' : shell_escape(pidfile),
479             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
480             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
481         }
482
483     (out,err),proc = rexec(
484         cmd,
485         host = host,
486         port = port,
487         user = user,
488         gwuser = gwuser,
489         gw = gw,
490         agent = agent,
491         identity = identity,
492         server_key = server_key,
493         tty = tty,
494         strict_host_checking = strict_host_checking ,
495         )
496     
497     if proc.wait():
498         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
499
500     return ((out, err), proc)
501
502 @eintr_retry
503 def rgetpid(pidfile,
504         host = None, 
505         port = None, 
506         user = None, 
507         gwuser = None,
508         gw = None,
509         agent = None, 
510         identity = None,
511         server_key = None,
512         strict_host_checking = True):
513     """
514     Returns the pid and ppid of a process from a remote file where the 
515     information was stored.
516
517         :param home: Path to directory where the pidfile is located
518         :type home: str
519
520         :param pidfile: Name of file containing the pid information
521         :type pidfile: str
522         
523         :rtype: int
524         
525         A (pid, ppid) tuple useful for calling rstatus and rkill,
526         or None if the pidfile isn't valid yet (can happen when process is staring up)
527
528     """
529     (out,err),proc = rexec(
530         "cat %(pidfile)s" % {
531             'pidfile' : pidfile,
532         },
533         host = host,
534         port = port,
535         user = user,
536         gwuser = gwuser,
537         gw = gw,
538         agent = agent,
539         identity = identity,
540         server_key = server_key,
541         strict_host_checking = strict_host_checking
542         )
543         
544     if proc.wait():
545         return None
546     
547     if out:
548         try:
549             return map(int,out.strip().split(' ',1))
550         except:
551             # Ignore, many ways to fail that don't matter that much
552             return None
553
554 @eintr_retry
555 def rstatus(pid, ppid, 
556         host = None, 
557         port = None, 
558         user = None, 
559         gwuser = None,
560         gw = None,
561         agent = None, 
562         identity = None,
563         server_key = None,
564         strict_host_checking = True):
565     """
566     Returns a code representing the the status of a remote process
567
568         :param pid: Process id of the process
569         :type pid: int
570
571         :param ppid: Parent process id of process
572         :type ppid: int
573     
574         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
575     
576     """
577     (out,err),proc = rexec(
578         # Check only by pid. pid+ppid does not always work (especially with sudo) 
579         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
580             'ppid' : ppid,
581             'pid' : pid,
582         },
583         host = host,
584         port = port,
585         user = user,
586         gwuser = gwuser,
587         gw = gw,
588         agent = agent,
589         identity = identity,
590         server_key = server_key,
591         strict_host_checking = strict_host_checking
592         )
593     
594     if proc.wait():
595         return ProcStatus.NOT_STARTED
596     
597     status = False
598     if err:
599         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
600             status = True
601     elif out:
602         status = (out.strip() == 'wait')
603     else:
604         return ProcStatus.NOT_STARTED
605     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
606
607 @eintr_retry
608 def rkill(pid, ppid,
609         host = None, 
610         port = None, 
611         user = None, 
612         gwuser = None,
613         gw = None,
614         agent = None, 
615         sudo = False,
616         identity = None, 
617         server_key = None, 
618         nowait = False,
619         strict_host_checking = True):
620     """
621     Sends a kill signal to a remote process.
622
623     First tries a SIGTERM, and if the process does not end in 10 seconds,
624     it sends a SIGKILL.
625  
626         :param pid: Process id of process to be killed
627         :type pid: int
628
629         :param ppid: Parent process id of process to be killed
630         :type ppid: int
631
632         :param sudo: Flag indicating if sudo should be used to kill the process
633         :type sudo: bool
634         
635     """
636     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
637     cmd = """
638 SUBKILL="%(subkill)s" ;
639 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
640 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
641 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
642     sleep 0.2 
643     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
644         break
645     else
646         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
647         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
648     fi
649     sleep 1.8
650 done
651 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
652     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
653     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
654 fi
655 """
656     if nowait:
657         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
658
659     (out,err),proc = rexec(
660         cmd % {
661             'ppid' : ppid,
662             'pid' : pid,
663             'sudo' : 'sudo -S' if sudo else '',
664             'subkill' : subkill,
665         },
666         host = host,
667         port = port,
668         user = user,
669         gwuser = gwuser,
670         gw = gw,
671         agent = agent,
672         identity = identity,
673         server_key = server_key,
674         strict_host_checking = strict_host_checking
675         )
676     
677     # wait, don't leave zombies around
678     proc.wait()
679
680     return (out, err), proc
681
682 def _retry_rexec(args,
683         log_msg,
684         stdout = subprocess.PIPE,
685         stdin = subprocess.PIPE, 
686         stderr = subprocess.PIPE,
687         env = None,
688         retry = 3,
689         tmp_known_hosts = None,
690         blocking = True):
691
692     for x in xrange(retry):
693         # connects to the remote host and starts a remote connection
694         proc = subprocess.Popen(args,
695                 env = env,
696                 stdout = stdout,
697                 stdin = stdin, 
698                 stderr = stderr)
699         
700         # attach tempfile object to the process, to make sure the file stays
701         # alive until the process is finished with it
702         proc._known_hosts = tmp_known_hosts
703     
704         # The argument block == False forces to rexec to return immediately, 
705         # without blocking 
706         try:
707             err = out = " "
708             if blocking:
709                 #(out, err) = proc.communicate()
710                 # The method communicate was re implemented for performance issues
711                 # when using python subprocess communicate method the ssh commands 
712                 # last one minute each
713                 out, err = _communicate(proc, input=None)
714
715             elif stdout:
716                 out = proc.stdout.read()
717                 if proc.poll() and stderr:
718                     err = proc.stderr.read()
719
720             log(log_msg, logging.DEBUG, out, err)
721
722             if proc.poll():
723                 skip = False
724
725                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
726                     # SSH error, can safely retry
727                     skip = True 
728                 elif retry:
729                     # Probably timed out or plain failed but can retry
730                     skip = True 
731                 
732                 if skip:
733                     t = x*2
734                     msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
735                             t, x, " ".join(args))
736                     log(msg, logging.DEBUG)
737
738                     time.sleep(t)
739                     continue
740             break
741         except RuntimeError, e:
742             msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
743             log(msg, logging.DEBUG, out, err)
744
745             if retry <= 0:
746                 raise
747             retry -= 1
748
749     return ((out, err), proc)
750
751 # POSIX
752 # Don't remove. The method communicate was re implemented for performance issues
753 def _communicate(proc, input, timeout=None, err_on_timeout=True):
754     read_set = []
755     write_set = []
756     stdout = None # Return
757     stderr = None # Return
758
759     killed = False
760
761     if timeout is not None:
762         timelimit = time.time() + timeout
763         killtime = timelimit + 4
764         bailtime = timelimit + 4
765
766     if proc.stdin:
767         # Flush stdio buffer.  This might block, if the user has
768         # been writing to .stdin in an uncontrolled fashion.
769         proc.stdin.flush()
770         if input:
771             write_set.append(proc.stdin)
772         else:
773             proc.stdin.close()
774
775     if proc.stdout:
776         read_set.append(proc.stdout)
777         stdout = []
778
779     if proc.stderr:
780         read_set.append(proc.stderr)
781         stderr = []
782
783     input_offset = 0
784     while read_set or write_set:
785         if timeout is not None:
786             curtime = time.time()
787             if timeout is None or curtime > timelimit:
788                 if curtime > bailtime:
789                     break
790                 elif curtime > killtime:
791                     signum = signal.SIGKILL
792                 else:
793                     signum = signal.SIGTERM
794                 # Lets kill it
795                 os.kill(proc.pid, signum)
796                 select_timeout = 0.5
797             else:
798                 select_timeout = timelimit - curtime + 0.1
799         else:
800             select_timeout = 1.0
801
802         if select_timeout > 1.0:
803             select_timeout = 1.0
804
805         try:
806             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
807         except select.error,e:
808             if e[0] != 4:
809                 raise
810             else:
811                 continue
812
813         if not rlist and not wlist and not xlist and proc.poll() is not None:
814             # timeout and process exited, say bye
815             break
816
817         if proc.stdin in wlist:
818             # When select has indicated that the file is writable,
819             # we can write up to PIPE_BUF bytes without risk
820             # blocking.  POSIX defines PIPE_BUF >= 512
821             bytes_written = os.write(proc.stdin.fileno(),
822                     buffer(input, input_offset, 512))
823             input_offset += bytes_written
824
825             if input_offset >= len(input):
826                 proc.stdin.close()
827                 write_set.remove(proc.stdin)
828
829         if proc.stdout in rlist:
830             data = os.read(proc.stdout.fileno(), 1024)
831             if data == "":
832                 proc.stdout.close()
833                 read_set.remove(proc.stdout)
834             stdout.append(data)
835
836         if proc.stderr in rlist:
837             data = os.read(proc.stderr.fileno(), 1024)
838             if data == "":
839                 proc.stderr.close()
840                 read_set.remove(proc.stderr)
841             stderr.append(data)
842
843     # All data exchanged.  Translate lists into strings.
844     if stdout is not None:
845         stdout = ''.join(stdout)
846     if stderr is not None:
847         stderr = ''.join(stderr)
848
849     # Translate newlines, if requested.  We cannot let the file
850     # object do the translation: It is based on stdio, which is
851     # impossible to combine with select (unless forcing no
852     # buffering).
853     if proc.universal_newlines and hasattr(file, 'newlines'):
854         if stdout:
855             stdout = proc._translate_newlines(stdout)
856         if stderr:
857             stderr = proc._translate_newlines(stderr)
858
859     if killed and err_on_timeout:
860         errcode = proc.poll()
861         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
862     else:
863         if killed:
864             proc.poll()
865         else:
866             proc.wait()
867         return (stdout, stderr)
868
869 def _proxy_command(gw, gwuser, gwidentity):
870     """
871     Constructs the SSH ProxyCommand option to add to the SSH command to connect
872     via a proxy
873         :param gw: SSH proxy hostname
874         :type gw:  str 
875        
876         :param gwuser: SSH proxy username
877         :type gwuser:  str
878
879         :param gwidentity: SSH proxy identity file 
880         :type gwidentity:  str
881
882   
883         :rtype: str 
884         
885         returns the SSH ProxyCommand option.
886     """
887
888     proxycommand = 'ProxyCommand=ssh -q '
889     if gwidentity:
890         proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
891     if gwuser:
892         proxycommand += '%s' % gwuser
893     else:
894         proxycommand += '%r'
895     proxycommand += '@%s -W %%h:%%p' % gw
896
897     return proxycommand
898