83e2d7e4e726a490feb08737abedb71847035820
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 #         Claudio Freire <claudio-daniel.freire@inria.fr>
19
20 ## TODO: This code needs reviewing !!!
21
22 import base64
23 import errno
24 import hashlib
25 import logging
26 import os
27 import os.path
28 import re
29 import select
30 import signal
31 import socket
32 import subprocess
33 import threading
34 import time
35 import tempfile
36
37 _re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") 
38
39 logger = logging.getLogger("sshfuncs")
40
41 def log(msg, level, out = None, err = None):
42     if out:
43         msg += " - OUT: %s " % out
44
45     if err:
46         msg += " - ERROR: %s " % err
47
48     logger.log(level, msg)
49
50 if hasattr(os, "devnull"):
51     DEV_NULL = os.devnull
52 else:
53     DEV_NULL = "/dev/null"
54
55 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
56
57 class STDOUT: 
58     """
59     Special value that when given to rspawn in stderr causes stderr to 
60     redirect to whatever stdout was redirected to.
61     """
62
63 class ProcStatus:
64     """
65     Codes for status of remote spawned process
66     """
67     # Process is still running
68     RUNNING = 1
69
70     # Process is finished
71     FINISHED = 2
72     
73     # Process hasn't started running yet (this should be very rare)
74     NOT_STARTED = 3
75
76 hostbyname_cache = dict()
77 hostbyname_cache_lock = threading.Lock()
78
79 def resolve_hostname(host):
80     ip = None
81
82     if host in ["localhost", "127.0.0.1", "::1"]:
83         p = subprocess.Popen("ip -o addr list", shell=True,
84                 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
85         stdout, stderr = p.communicate()
86         m = _re_inet.findall(stdout)
87         ip = m[0][1].split("/")[0]
88     else:
89         ip = socket.gethostbyname(host)
90
91     return ip
92
93 def gethostbyname(host):
94     global hostbyname_cache
95     global hostbyname_cache_lock
96     
97     hostbyname = hostbyname_cache.get(host)
98     if not hostbyname:
99         with hostbyname_cache_lock:
100             hostbyname = resolve_hostname(host)
101             hostbyname_cache[host] = hostbyname
102
103             msg = " Added hostbyname %s - %s " % (host, hostbyname)
104             log(msg, logging.DEBUG)
105
106     return hostbyname
107
108 OPENSSH_HAS_PERSIST = None
109
110 def openssh_has_persist():
111     """ The ssh_config options ControlMaster and ControlPersist allow to
112     reuse a same network connection for multiple ssh sessions. In this 
113     way limitations on number of open ssh connections can be bypassed.
114     However, older versions of openSSH do not support this feature.
115     This function is used to determine if ssh connection persist features
116     can be used.
117     """
118     global OPENSSH_HAS_PERSIST
119     if OPENSSH_HAS_PERSIST is None:
120         proc = subprocess.Popen(["ssh","-v"],
121             stdout = subprocess.PIPE,
122             stderr = subprocess.STDOUT,
123             stdin = open("/dev/null","r") )
124         out,err = proc.communicate()
125         proc.wait()
126         
127         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
128         OPENSSH_HAS_PERSIST = bool(vre.match(out))
129     return OPENSSH_HAS_PERSIST
130
131 def make_server_key_args(server_key, host, port):
132     """ Returns a reference to a temporary known_hosts file, to which 
133     the server key has been added. 
134     
135     Make sure to hold onto the temp file reference until the process is 
136     done with it
137
138     :param server_key: the server public key
139     :type server_key: str
140
141     :param host: the hostname
142     :type host: str
143
144     :param port: the ssh port
145     :type port: str
146
147     """
148     if port is not None:
149         host = '%s:%s' % (host, str(port))
150
151     # Create a temporary server key file
152     tmp_known_hosts = tempfile.NamedTemporaryFile()
153    
154     hostbyname = gethostbyname(host) 
155
156     # Add the intended host key
157     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
158     
159     # If we're not in strict mode, add user-configured keys
160     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
161         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
162         if os.access(user_hosts_path, os.R_OK):
163             f = open(user_hosts_path, "r")
164             tmp_known_hosts.write(f.read())
165             f.close()
166         
167     tmp_known_hosts.flush()
168     
169     return tmp_known_hosts
170
171 def make_control_path(agent, forward_x11):
172     ctrl_path = "/tmp/nepi_ssh"
173
174     if agent:
175         ctrl_path +="_a"
176
177     if forward_x11:
178         ctrl_path +="_x"
179
180     ctrl_path += "-%r@%h:%p"
181
182     return ctrl_path
183
184 def shell_escape(s):
185     """ Escapes strings so that they are safe to use as command-line 
186     arguments """
187     if SHELL_SAFE.match(s):
188         # safe string - no escaping needed
189         return s
190     else:
191         # unsafe string - escape
192         def escp(c):
193             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
194                 return c
195             else:
196                 return "'$'\\x%02x''" % (ord(c),)
197         s = ''.join(map(escp,s))
198         return "'%s'" % (s,)
199
200 def eintr_retry(func):
201     """Retries a function invocation when a EINTR occurs"""
202     import functools
203     @functools.wraps(func)
204     def rv(*p, **kw):
205         retry = kw.pop("_retry", False)
206         for i in xrange(0 if retry else 4):
207             try:
208                 return func(*p, **kw)
209             except (select.error, socket.error), args:
210                 if args[0] == errno.EINTR:
211                     continue
212                 else:
213                     raise 
214             except OSError, e:
215                 if e.errno == errno.EINTR:
216                     continue
217                 else:
218                     raise
219         else:
220             return func(*p, **kw)
221     return rv
222
223 def rexec(command, host, user, 
224         port = None,
225         gwuser = None,
226         gw = None, 
227         agent = True,
228         sudo = False,
229         identity = None,
230         server_key = None,
231         env = None,
232         tty = False,
233         connect_timeout = 30,
234         retry = 3,
235         persistent = True,
236         forward_x11 = False,
237         blocking = True,
238         strict_host_checking = True):
239     """
240     Executes a remote command, returns ((stdout,stderr),process)
241     """
242
243     tmp_known_hosts = None
244     if not gw:
245         hostip = gethostbyname(host)
246     else: hostip = None
247
248     args = ['ssh', '-C',
249             # Don't bother with localhost. Makes test easier
250             '-o', 'NoHostAuthenticationForLocalhost=yes',
251             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
252             '-o', 'ConnectionAttempts=3',
253             '-o', 'ServerAliveInterval=30',
254             '-o', 'TCPKeepAlive=yes',
255             '-o', 'Batchmode=yes',
256             '-l', user, hostip or host]
257
258     if persistent and openssh_has_persist():
259         args.extend([
260             '-o', 'ControlMaster=auto',
261             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
262             '-o', 'ControlPersist=60' ])
263
264     if not strict_host_checking:
265         # Do not check for Host key. Unsafe.
266         args.extend(['-o', 'StrictHostKeyChecking=no'])
267
268     if gw:
269         proxycommand = _proxy_command(gw, gwuser, identity)
270         args.extend(['-o', proxycommand])
271
272     if agent:
273         args.append('-A')
274
275     if port:
276         args.append('-p%d' % port)
277
278     if identity:
279         identity = os.path.expanduser(identity)
280         args.extend(('-i', identity))
281
282     if tty:
283         args.append('-t')
284         args.append('-t')
285
286     if forward_x11:
287         args.append('-X')
288
289     if server_key:
290         # Create a temporary server key file
291         tmp_known_hosts = make_server_key_args(server_key, host, port)
292         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
293
294     if sudo:
295         command = "sudo " + command
296
297     args.append(command)
298
299     log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) 
300
301     stdout = stderr = stdin = subprocess.PIPE
302     if forward_x11:
303         stdout = stderr = stdin = None
304
305     return _retry_rexec(args, log_msg, 
306             stderr = stderr,
307             stdin = stdin,
308             stdout = stdout,
309             env = env, 
310             retry = retry, 
311             tmp_known_hosts = tmp_known_hosts,
312             blocking = blocking)
313
314 def rcopy(source, dest,
315         port = None,
316         gwuser = None,
317         gw = None,
318         recursive = False,
319         identity = None,
320         server_key = None,
321         retry = 3,
322         strict_host_checking = True):
323     """
324     Copies from/to remote sites.
325     
326     Source and destination should have the user and host encoded
327     as per scp specs.
328     
329     Source can be a list of files to copy to a single destination, 
330     (in which case it is advised that the destination be a folder),
331     or a single file in a string.
332     """
333
334     # Parse destination as <user>@<server>:<path>
335     if isinstance(dest, str) and ':' in dest:
336         remspec, path = dest.split(':',1)
337     elif isinstance(source, str) and ':' in source:
338         remspec, path = source.split(':',1)
339     else:
340         raise ValueError, "Both endpoints cannot be local"
341     user,host = remspec.rsplit('@',1)
342     
343     # plain scp
344     tmp_known_hosts = None
345
346     args = ['scp', '-q', '-p', '-C',
347             # Speed up transfer using blowfish cypher specification which is 
348             # faster than the default one (3des)
349             '-c', 'blowfish',
350             # Don't bother with localhost. Makes test easier
351             '-o', 'NoHostAuthenticationForLocalhost=yes',
352             '-o', 'ConnectTimeout=60',
353             '-o', 'ConnectionAttempts=3',
354             '-o', 'ServerAliveInterval=30',
355             '-o', 'TCPKeepAlive=yes' ]
356             
357     if port:
358         args.append('-P%d' % port)
359
360     if gw:
361         proxycommand = _proxy_command(gw, gwuser, identity)
362         args.extend(['-o', proxycommand])
363
364     if recursive:
365         args.append('-r')
366
367     if identity:
368         identity = os.path.expanduser(identity)
369         args.extend(('-i', identity))
370
371     if server_key:
372         # Create a temporary server key file
373         tmp_known_hosts = make_server_key_args(server_key, host, port)
374         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
375
376     if not strict_host_checking:
377         # Do not check for Host key. Unsafe.
378         args.extend(['-o', 'StrictHostKeyChecking=no'])
379     
380     if isinstance(source, list):
381         args.extend(source)
382     else:
383         if openssh_has_persist():
384             args.extend([
385                 '-o', 'ControlMaster=auto',
386                 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
387                 ])
388         args.append(source)
389
390     if isinstance(dest, list):
391         args.extend(dest)
392     else:
393         args.append(dest)
394
395     log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
396     
397     return _retry_rexec(args, log_msg, env = None, retry = retry, 
398             tmp_known_hosts = tmp_known_hosts,
399             blocking = True)
400
401 def rspawn(command, pidfile, 
402         stdout = '/dev/null', 
403         stderr = STDOUT, 
404         stdin = '/dev/null',
405         home = None, 
406         create_home = False, 
407         sudo = False,
408         host = None, 
409         port = None, 
410         user = None, 
411         gwuser = None,
412         gw = None,
413         agent = None, 
414         identity = None, 
415         server_key = None,
416         tty = False,
417         strict_host_checking = True):
418     """
419     Spawn a remote command such that it will continue working asynchronously in 
420     background. 
421
422         :param command: The command to run, it should be a single line.
423         :type command: str
424
425         :param pidfile: Path to a file where to store the pid and ppid of the 
426                         spawned process
427         :type pidfile: str
428
429         :param stdout: Path to file to redirect standard output. 
430                        The default value is /dev/null
431         :type stdout: str
432
433         :param stderr: Path to file to redirect standard error.
434                        If the special STDOUT value is used, stderr will 
435                        be redirected to the same file as stdout
436         :type stderr: str
437
438         :param stdin: Path to a file with input to be piped into the command's standard input
439         :type stdin: str
440
441         :param home: Path to working directory folder. 
442                     It is assumed to exist unless the create_home flag is set.
443         :type home: str
444
445         :param create_home: Flag to force creation of the home folder before 
446                             running the command
447         :type create_home: bool
448  
449         :param sudo: Flag forcing execution with sudo user
450         :type sudo: bool
451         
452         :rtype: tuple
453
454         (stdout, stderr), process
455         
456         Of the spawning process, which only captures errors at spawning time.
457         Usually only useful for diagnostics.
458     """
459     # Start process in a "daemonized" way, using nohup and heavy
460     # stdin/out redirection to avoid connection issues
461     if stderr is STDOUT:
462         stderr = '&1'
463     else:
464         stderr = ' ' + stderr
465     
466     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
467         'command' : command,
468         'pidfile' : shell_escape(pidfile),
469         'stdout' : stdout,
470         'stderr' : stderr,
471         'stdin' : stdin,
472     }
473     
474     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
475             'command' : shell_escape(daemon_command),
476             'sudo' : 'sudo -S' if sudo else '',
477             'pidfile' : shell_escape(pidfile),
478             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
479             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
480         }
481
482     (out,err),proc = rexec(
483         cmd,
484         host = host,
485         port = port,
486         user = user,
487         gwuser = gwuser,
488         gw = gw,
489         agent = agent,
490         identity = identity,
491         server_key = server_key,
492         tty = tty,
493         strict_host_checking = strict_host_checking ,
494         )
495     
496     if proc.wait():
497         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
498
499     return ((out, err), proc)
500
501 @eintr_retry
502 def rgetpid(pidfile,
503         host = None, 
504         port = None, 
505         user = None, 
506         gwuser = None,
507         gw = None,
508         agent = None, 
509         identity = None,
510         server_key = None,
511         strict_host_checking = True):
512     """
513     Returns the pid and ppid of a process from a remote file where the 
514     information was stored.
515
516         :param home: Path to directory where the pidfile is located
517         :type home: str
518
519         :param pidfile: Name of file containing the pid information
520         :type pidfile: str
521         
522         :rtype: int
523         
524         A (pid, ppid) tuple useful for calling rstatus and rkill,
525         or None if the pidfile isn't valid yet (can happen when process is staring up)
526
527     """
528     (out,err),proc = rexec(
529         "cat %(pidfile)s" % {
530             'pidfile' : pidfile,
531         },
532         host = host,
533         port = port,
534         user = user,
535         gwuser = gwuser,
536         gw = gw,
537         agent = agent,
538         identity = identity,
539         server_key = server_key,
540         strict_host_checking = strict_host_checking
541         )
542         
543     if proc.wait():
544         return None
545     
546     if out:
547         try:
548             return map(int,out.strip().split(' ',1))
549         except:
550             # Ignore, many ways to fail that don't matter that much
551             return None
552
553 @eintr_retry
554 def rstatus(pid, ppid, 
555         host = None, 
556         port = None, 
557         user = None, 
558         gwuser = None,
559         gw = None,
560         agent = None, 
561         identity = None,
562         server_key = None,
563         strict_host_checking = True):
564     """
565     Returns a code representing the the status of a remote process
566
567         :param pid: Process id of the process
568         :type pid: int
569
570         :param ppid: Parent process id of process
571         :type ppid: int
572     
573         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
574     
575     """
576     (out,err),proc = rexec(
577         # Check only by pid. pid+ppid does not always work (especially with sudo) 
578         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
579             'ppid' : ppid,
580             'pid' : pid,
581         },
582         host = host,
583         port = port,
584         user = user,
585         gwuser = gwuser,
586         gw = gw,
587         agent = agent,
588         identity = identity,
589         server_key = server_key,
590         strict_host_checking = strict_host_checking
591         )
592     
593     if proc.wait():
594         return ProcStatus.NOT_STARTED
595     
596     status = False
597     if err:
598         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
599             status = True
600     elif out:
601         status = (out.strip() == 'wait')
602     else:
603         return ProcStatus.NOT_STARTED
604     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
605
606 @eintr_retry
607 def rkill(pid, ppid,
608         host = None, 
609         port = None, 
610         user = None, 
611         gwuser = None,
612         gw = None,
613         agent = None, 
614         sudo = False,
615         identity = None, 
616         server_key = None, 
617         nowait = False,
618         strict_host_checking = True):
619     """
620     Sends a kill signal to a remote process.
621
622     First tries a SIGTERM, and if the process does not end in 10 seconds,
623     it sends a SIGKILL.
624  
625         :param pid: Process id of process to be killed
626         :type pid: int
627
628         :param ppid: Parent process id of process to be killed
629         :type ppid: int
630
631         :param sudo: Flag indicating if sudo should be used to kill the process
632         :type sudo: bool
633         
634     """
635     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
636     cmd = """
637 SUBKILL="%(subkill)s" ;
638 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
639 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
640 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
641     sleep 0.2 
642     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
643         break
644     else
645         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
646         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
647     fi
648     sleep 1.8
649 done
650 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
651     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
652     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
653 fi
654 """
655     if nowait:
656         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
657
658     (out,err),proc = rexec(
659         cmd % {
660             'ppid' : ppid,
661             'pid' : pid,
662             'sudo' : 'sudo -S' if sudo else '',
663             'subkill' : subkill,
664         },
665         host = host,
666         port = port,
667         user = user,
668         gwuser = gwuser,
669         gw = gw,
670         agent = agent,
671         identity = identity,
672         server_key = server_key,
673         strict_host_checking = strict_host_checking
674         )
675     
676     # wait, don't leave zombies around
677     proc.wait()
678
679     return (out, err), proc
680
681 def _retry_rexec(args,
682         log_msg,
683         stdout = subprocess.PIPE,
684         stdin = subprocess.PIPE, 
685         stderr = subprocess.PIPE,
686         env = None,
687         retry = 3,
688         tmp_known_hosts = None,
689         blocking = True):
690
691     for x in xrange(retry):
692         # connects to the remote host and starts a remote connection
693         proc = subprocess.Popen(args,
694                 env = env,
695                 stdout = stdout,
696                 stdin = stdin, 
697                 stderr = stderr)
698         
699         # attach tempfile object to the process, to make sure the file stays
700         # alive until the process is finished with it
701         proc._known_hosts = tmp_known_hosts
702     
703         # The argument block == False forces to rexec to return immediately, 
704         # without blocking 
705         try:
706             err = out = " "
707             if blocking:
708                 #(out, err) = proc.communicate()
709                 # The method communicate was re implemented for performance issues
710                 # when using python subprocess communicate method the ssh commands 
711                 # last one minute each
712                 out, err = _communicate(proc, input=None)
713
714             elif stdout:
715                 out = proc.stdout.read()
716                 if proc.poll() and stderr:
717                     err = proc.stderr.read()
718
719             log(log_msg, logging.DEBUG, out, err)
720
721             if proc.poll():
722                 skip = False
723
724                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
725                     # SSH error, can safely retry
726                     skip = True 
727                 elif retry:
728                     # Probably timed out or plain failed but can retry
729                     skip = True 
730                 
731                 if skip:
732                     t = x*2
733                     msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
734                             t, x, " ".join(args))
735                     log(msg, logging.DEBUG)
736
737                     time.sleep(t)
738                     continue
739             break
740         except RuntimeError, e:
741             msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
742             log(msg, logging.DEBUG, out, err)
743
744             if retry <= 0:
745                 raise
746             retry -= 1
747
748     return ((out, err), proc)
749
750 # POSIX
751 # Don't remove. The method communicate was re implemented for performance issues
752 def _communicate(proc, input, timeout=None, err_on_timeout=True):
753     read_set = []
754     write_set = []
755     stdout = None # Return
756     stderr = None # Return
757
758     killed = False
759
760     if timeout is not None:
761         timelimit = time.time() + timeout
762         killtime = timelimit + 4
763         bailtime = timelimit + 4
764
765     if proc.stdin:
766         # Flush stdio buffer.  This might block, if the user has
767         # been writing to .stdin in an uncontrolled fashion.
768         proc.stdin.flush()
769         if input:
770             write_set.append(proc.stdin)
771         else:
772             proc.stdin.close()
773
774     if proc.stdout:
775         read_set.append(proc.stdout)
776         stdout = []
777
778     if proc.stderr:
779         read_set.append(proc.stderr)
780         stderr = []
781
782     input_offset = 0
783     while read_set or write_set:
784         if timeout is not None:
785             curtime = time.time()
786             if timeout is None or curtime > timelimit:
787                 if curtime > bailtime:
788                     break
789                 elif curtime > killtime:
790                     signum = signal.SIGKILL
791                 else:
792                     signum = signal.SIGTERM
793                 # Lets kill it
794                 os.kill(proc.pid, signum)
795                 select_timeout = 0.5
796             else:
797                 select_timeout = timelimit - curtime + 0.1
798         else:
799             select_timeout = 1.0
800
801         if select_timeout > 1.0:
802             select_timeout = 1.0
803
804         try:
805             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
806         except select.error,e:
807             if e[0] != 4:
808                 raise
809             else:
810                 continue
811
812         if not rlist and not wlist and not xlist and proc.poll() is not None:
813             # timeout and process exited, say bye
814             break
815
816         if proc.stdin in wlist:
817             # When select has indicated that the file is writable,
818             # we can write up to PIPE_BUF bytes without risk
819             # blocking.  POSIX defines PIPE_BUF >= 512
820             bytes_written = os.write(proc.stdin.fileno(),
821                     buffer(input, input_offset, 512))
822             input_offset += bytes_written
823
824             if input_offset >= len(input):
825                 proc.stdin.close()
826                 write_set.remove(proc.stdin)
827
828         if proc.stdout in rlist:
829             data = os.read(proc.stdout.fileno(), 1024)
830             if data == "":
831                 proc.stdout.close()
832                 read_set.remove(proc.stdout)
833             stdout.append(data)
834
835         if proc.stderr in rlist:
836             data = os.read(proc.stderr.fileno(), 1024)
837             if data == "":
838                 proc.stderr.close()
839                 read_set.remove(proc.stderr)
840             stderr.append(data)
841
842     # All data exchanged.  Translate lists into strings.
843     if stdout is not None:
844         stdout = ''.join(stdout)
845     if stderr is not None:
846         stderr = ''.join(stderr)
847
848     # Translate newlines, if requested.  We cannot let the file
849     # object do the translation: It is based on stdio, which is
850     # impossible to combine with select (unless forcing no
851     # buffering).
852     if proc.universal_newlines and hasattr(file, 'newlines'):
853         if stdout:
854             stdout = proc._translate_newlines(stdout)
855         if stderr:
856             stderr = proc._translate_newlines(stderr)
857
858     if killed and err_on_timeout:
859         errcode = proc.poll()
860         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
861     else:
862         if killed:
863             proc.poll()
864         else:
865             proc.wait()
866         return (stdout, stderr)
867
868 def _proxy_command(gw, gwuser, gwidentity):
869     """
870     Constructs the SSH ProxyCommand option to add to the SSH command to connect
871     via a proxy
872         :param gw: SSH proxy hostname
873         :type gw:  str 
874        
875         :param gwuser: SSH proxy username
876         :type gwuser:  str
877
878         :param gwidentity: SSH proxy identity file 
879         :type gwidentity:  str
880
881   
882         :rtype: str 
883         
884         returns the SSH ProxyCommand option.
885     """
886
887     proxycommand = 'ProxyCommand=ssh -q '
888     if gwidentity:
889         proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
890     if gwuser:
891         proxycommand += '%s' % gwuser
892     else:
893         proxycommand += '%r'
894     proxycommand += '@%s -W %%h:%%p' % gw
895
896     return proxycommand
897