Fix bug strict host checking
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 ## TODO: This code needs reviewing !!!
22
23 import base64
24 import errno
25 import hashlib
26 import logging
27 import os
28 import os.path
29 import re
30 import select
31 import signal
32 import socket
33 import subprocess
34 import threading
35 import time
36 import tempfile
37
38 logger = logging.getLogger("sshfuncs")
39
40 def log(msg, level, out = None, err = None):
41     if out:
42         msg += " - OUT: %s " % out
43
44     if err:
45         msg += " - ERROR: %s " % err
46
47     logger.log(level, msg)
48
49 if hasattr(os, "devnull"):
50     DEV_NULL = os.devnull
51 else:
52     DEV_NULL = "/dev/null"
53
54 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
55
56 class STDOUT: 
57     """
58     Special value that when given to rspawn in stderr causes stderr to 
59     redirect to whatever stdout was redirected to.
60     """
61
62 class ProcStatus:
63     """
64     Codes for status of remote spawned process
65     """
66     # Process is still running
67     RUNNING = 1
68
69     # Process is finished
70     FINISHED = 2
71     
72     # Process hasn't started running yet (this should be very rare)
73     NOT_STARTED = 3
74
75 hostbyname_cache = dict()
76 hostbyname_cache_lock = threading.Lock()
77
78 def gethostbyname(host):
79     global hostbyname_cache
80     global hostbyname_cache_lock
81     
82     hostbyname = hostbyname_cache.get(host)
83     if not hostbyname:
84         with hostbyname_cache_lock:
85             hostbyname = socket.gethostbyname(host)
86             hostbyname_cache[host] = hostbyname
87
88             msg = " Added hostbyname %s - %s " % (host, hostbyname)
89             log(msg, logging.DEBUG)
90
91     return hostbyname
92
93 OPENSSH_HAS_PERSIST = None
94
95 def openssh_has_persist():
96     """ The ssh_config options ControlMaster and ControlPersist allow to
97     reuse a same network connection for multiple ssh sessions. In this 
98     way limitations on number of open ssh connections can be bypassed.
99     However, older versions of openSSH do not support this feature.
100     This function is used to determine if ssh connection persist features
101     can be used.
102     """
103     global OPENSSH_HAS_PERSIST
104     if OPENSSH_HAS_PERSIST is None:
105         proc = subprocess.Popen(["ssh","-v"],
106             stdout = subprocess.PIPE,
107             stderr = subprocess.STDOUT,
108             stdin = open("/dev/null","r") )
109         out,err = proc.communicate()
110         proc.wait()
111         
112         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
113         OPENSSH_HAS_PERSIST = bool(vre.match(out))
114     return OPENSSH_HAS_PERSIST
115
116 def make_server_key_args(server_key, host, port):
117     """ Returns a reference to a temporary known_hosts file, to which 
118     the server key has been added. 
119     
120     Make sure to hold onto the temp file reference until the process is 
121     done with it
122
123     :param server_key: the server public key
124     :type server_key: str
125
126     :param host: the hostname
127     :type host: str
128
129     :param port: the ssh port
130     :type port: str
131
132     """
133     if port is not None:
134         host = '%s:%s' % (host, str(port))
135
136     # Create a temporary server key file
137     tmp_known_hosts = tempfile.NamedTemporaryFile()
138    
139     hostbyname = gethostbyname(host) 
140
141     # Add the intended host key
142     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
143     
144     # If we're not in strict mode, add user-configured keys
145     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
146         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
147         if os.access(user_hosts_path, os.R_OK):
148             f = open(user_hosts_path, "r")
149             tmp_known_hosts.write(f.read())
150             f.close()
151         
152     tmp_known_hosts.flush()
153     
154     return tmp_known_hosts
155
156 def make_control_path(agent, forward_x11):
157     ctrl_path = "/tmp/nepi_ssh"
158
159     if agent:
160         ctrl_path +="_a"
161
162     if forward_x11:
163         ctrl_path +="_x"
164
165     ctrl_path += "-%r@%h:%p"
166
167     return ctrl_path
168
169 def shell_escape(s):
170     """ Escapes strings so that they are safe to use as command-line 
171     arguments """
172     if SHELL_SAFE.match(s):
173         # safe string - no escaping needed
174         return s
175     else:
176         # unsafe string - escape
177         def escp(c):
178             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
179                 return c
180             else:
181                 return "'$'\\x%02x''" % (ord(c),)
182         s = ''.join(map(escp,s))
183         return "'%s'" % (s,)
184
185 def eintr_retry(func):
186     """Retries a function invocation when a EINTR occurs"""
187     import functools
188     @functools.wraps(func)
189     def rv(*p, **kw):
190         retry = kw.pop("_retry", False)
191         for i in xrange(0 if retry else 4):
192             try:
193                 return func(*p, **kw)
194             except (select.error, socket.error), args:
195                 if args[0] == errno.EINTR:
196                     continue
197                 else:
198                     raise 
199             except OSError, e:
200                 if e.errno == errno.EINTR:
201                     continue
202                 else:
203                     raise
204         else:
205             return func(*p, **kw)
206     return rv
207
208 def rexec(command, host, user, 
209         port = None,
210         gwuser = None,
211         gw = None, 
212         agent = True,
213         sudo = False,
214         identity = None,
215         server_key = None,
216         env = None,
217         tty = False,
218         connect_timeout = 30,
219         retry = 3,
220         persistent = True,
221         forward_x11 = False,
222         blocking = True,
223         strict_host_checking = True):
224     """
225     Executes a remote command, returns ((stdout,stderr),process)
226     """
227
228     tmp_known_hosts = None
229     if not gw:
230         hostip = gethostbyname(host)
231     else: hostip = None
232
233     args = ['ssh', '-C',
234             # Don't bother with localhost. Makes test easier
235             '-o', 'NoHostAuthenticationForLocalhost=yes',
236             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
237             '-o', 'ConnectionAttempts=3',
238             '-o', 'ServerAliveInterval=30',
239             '-o', 'TCPKeepAlive=yes',
240             '-o', 'Batchmode=yes',
241             '-l', user, hostip or host]
242
243     if persistent and openssh_has_persist():
244         args.extend([
245             '-o', 'ControlMaster=auto',
246             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
247             '-o', 'ControlPersist=60' ])
248
249     if not strict_host_checking:
250         # Do not check for Host key. Unsafe.
251         args.extend(['-o', 'StrictHostKeyChecking=no'])
252
253     if gw:
254         if gwuser:
255             proxycommand = 'ProxyCommand=ssh -q %s@%s -W %%h:%%p' % (gwuser, gw)
256         else:
257             proxycommand = 'ProxyCommand=ssh -q %%r@%s -W %%h:%%p' % gw
258         args.extend(['-o', proxycommand])
259
260     if agent:
261         args.append('-A')
262
263     if port:
264         args.append('-p%d' % port)
265
266     if identity:
267         identity = os.path.expanduser(identity)
268         args.extend(('-i', identity))
269
270     if tty:
271         args.append('-t')
272         args.append('-t')
273
274     if forward_x11:
275         args.append('-X')
276
277     if server_key:
278         # Create a temporary server key file
279         tmp_known_hosts = make_server_key_args(server_key, host, port)
280         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
281
282     if sudo:
283         command = "sudo " + command
284
285     args.append(command)
286
287     log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) 
288
289     stdout = stderr = stdin = subprocess.PIPE
290     if forward_x11:
291         stdout = stderr = stdin = None
292
293     return _retry_rexec(args, log_msg, 
294             stderr = stderr,
295             stdin = stdin,
296             stdout = stdout,
297             env = env, 
298             retry = retry, 
299             tmp_known_hosts = tmp_known_hosts,
300             blocking = blocking)
301
302 def rcopy(source, dest,
303         port = None,
304         gwuser = None,
305         gw = None,
306         recursive = False,
307         identity = None,
308         server_key = None,
309         retry = 3,
310         strict_host_checking = True):
311     """
312     Copies from/to remote sites.
313     
314     Source and destination should have the user and host encoded
315     as per scp specs.
316     
317     Source can be a list of files to copy to a single destination, 
318     (in which case it is advised that the destination be a folder),
319     or a single file in a string.
320     """
321
322     # Parse destination as <user>@<server>:<path>
323     if isinstance(dest, str) and ':' in dest:
324         remspec, path = dest.split(':',1)
325     elif isinstance(source, str) and ':' in source:
326         remspec, path = source.split(':',1)
327     else:
328         raise ValueError, "Both endpoints cannot be local"
329     user,host = remspec.rsplit('@',1)
330     
331     # plain scp
332     tmp_known_hosts = None
333
334     args = ['scp', '-q', '-p', '-C',
335             # Speed up transfer using blowfish cypher specification which is 
336             # faster than the default one (3des)
337             '-c', 'blowfish',
338             # Don't bother with localhost. Makes test easier
339             '-o', 'NoHostAuthenticationForLocalhost=yes',
340             '-o', 'ConnectTimeout=60',
341             '-o', 'ConnectionAttempts=3',
342             '-o', 'ServerAliveInterval=30',
343             '-o', 'TCPKeepAlive=yes' ]
344             
345     if port:
346         args.append('-P%d' % port)
347
348     if gw:
349         if gwuser:
350             proxycommand = 'ProxyCommand=ssh -q %s@%s -W %%h:%%p' % (gwuser, gw)
351         else:
352             proxycommand = 'ProxyCommand=ssh -q %%r@%s -W %%h:%%p' % gw
353         args.extend(['-o', proxycommand])
354
355     if recursive:
356         args.append('-r')
357
358     if identity:
359         identity = os.path.expanduser(identity)
360         args.extend(('-i', identity))
361
362     if server_key:
363         # Create a temporary server key file
364         tmp_known_hosts = make_server_key_args(server_key, host, port)
365         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
366
367     if not strict_host_checking:
368         # Do not check for Host key. Unsafe.
369         args.extend(['-o', 'StrictHostKeyChecking=no'])
370     
371     if isinstance(source, list):
372         args.extend(source)
373     else:
374         if openssh_has_persist():
375             args.extend([
376                 '-o', 'ControlMaster=auto',
377                 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
378                 ])
379         args.append(source)
380
381     if isinstance(dest, list):
382         args.extend(dest)
383     else:
384         args.append(dest)
385
386     log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
387     
388     return _retry_rexec(args, log_msg, env = None, retry = retry, 
389             tmp_known_hosts = tmp_known_hosts,
390             blocking = True)
391
392 def rspawn(command, pidfile, 
393         stdout = '/dev/null', 
394         stderr = STDOUT, 
395         stdin = '/dev/null',
396         home = None, 
397         create_home = False, 
398         sudo = False,
399         host = None, 
400         port = None, 
401         user = None, 
402         gwuser = None,
403         gw = None,
404         agent = None, 
405         identity = None, 
406         server_key = None,
407         tty = False,
408         strict_host_checking = True):
409     """
410     Spawn a remote command such that it will continue working asynchronously in 
411     background. 
412
413         :param command: The command to run, it should be a single line.
414         :type command: str
415
416         :param pidfile: Path to a file where to store the pid and ppid of the 
417                         spawned process
418         :type pidfile: str
419
420         :param stdout: Path to file to redirect standard output. 
421                        The default value is /dev/null
422         :type stdout: str
423
424         :param stderr: Path to file to redirect standard error.
425                        If the special STDOUT value is used, stderr will 
426                        be redirected to the same file as stdout
427         :type stderr: str
428
429         :param stdin: Path to a file with input to be piped into the command's standard input
430         :type stdin: str
431
432         :param home: Path to working directory folder. 
433                     It is assumed to exist unless the create_home flag is set.
434         :type home: str
435
436         :param create_home: Flag to force creation of the home folder before 
437                             running the command
438         :type create_home: bool
439  
440         :param sudo: Flag forcing execution with sudo user
441         :type sudo: bool
442         
443         :rtype: touple
444
445         (stdout, stderr), process
446         
447         Of the spawning process, which only captures errors at spawning time.
448         Usually only useful for diagnostics.
449     """
450     # Start process in a "daemonized" way, using nohup and heavy
451     # stdin/out redirection to avoid connection issues
452     if stderr is STDOUT:
453         stderr = '&1'
454     else:
455         stderr = ' ' + stderr
456     
457     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
458         'command' : command,
459         'pidfile' : shell_escape(pidfile),
460         'stdout' : stdout,
461         'stderr' : stderr,
462         'stdin' : stdin,
463     }
464     
465     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
466             'command' : shell_escape(daemon_command),
467             'sudo' : 'sudo -S' if sudo else '',
468             'pidfile' : shell_escape(pidfile),
469             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
470             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
471         }
472
473     (out,err),proc = rexec(
474         cmd,
475         host = host,
476         port = port,
477         user = user,
478         gwuser = gwuser,
479         gw = gw,
480         agent = agent,
481         identity = identity,
482         server_key = server_key,
483         tty = tty,
484         strict_host_checking = strict_host_checking ,
485         )
486     
487     if proc.wait():
488         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
489
490     return ((out, err), proc)
491
492 @eintr_retry
493 def rgetpid(pidfile,
494         host = None, 
495         port = None, 
496         user = None, 
497         gwuser = None,
498         gw = None,
499         agent = None, 
500         identity = None,
501         server_key = None,
502         strict_host_checking = True):
503     """
504     Returns the pid and ppid of a process from a remote file where the 
505     information was stored.
506
507         :param home: Path to directory where the pidfile is located
508         :type home: str
509
510         :param pidfile: Name of file containing the pid information
511         :type pidfile: str
512         
513         :rtype: int
514         
515         A (pid, ppid) tuple useful for calling rstatus and rkill,
516         or None if the pidfile isn't valid yet (can happen when process is staring up)
517
518     """
519     (out,err),proc = rexec(
520         "cat %(pidfile)s" % {
521             'pidfile' : pidfile,
522         },
523         host = host,
524         port = port,
525         user = user,
526         gwuser = gwuser,
527         gw = gw,
528         agent = agent,
529         identity = identity,
530         server_key = server_key,
531         strict_host_checking = strict_host_checking
532         )
533         
534     if proc.wait():
535         return None
536     
537     if out:
538         try:
539             return map(int,out.strip().split(' ',1))
540         except:
541             # Ignore, many ways to fail that don't matter that much
542             return None
543
544 @eintr_retry
545 def rstatus(pid, ppid, 
546         host = None, 
547         port = None, 
548         user = None, 
549         gwuser = None,
550         gw = None,
551         agent = None, 
552         identity = None,
553         server_key = None,
554         strict_host_checking = True):
555     """
556     Returns a code representing the the status of a remote process
557
558         :param pid: Process id of the process
559         :type pid: int
560
561         :param ppid: Parent process id of process
562         :type ppid: int
563     
564         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
565     
566     """
567     (out,err),proc = rexec(
568         # Check only by pid. pid+ppid does not always work (especially with sudo) 
569         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
570             'ppid' : ppid,
571             'pid' : pid,
572         },
573         host = host,
574         port = port,
575         user = user,
576         gwuser = gwuser,
577         gw = gw,
578         agent = agent,
579         identity = identity,
580         server_key = server_key,
581         strict_host_checking = strict_host_checking
582         )
583     
584     if proc.wait():
585         return ProcStatus.NOT_STARTED
586     
587     status = False
588     if err:
589         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
590             status = True
591     elif out:
592         status = (out.strip() == 'wait')
593     else:
594         return ProcStatus.NOT_STARTED
595     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
596
597 @eintr_retry
598 def rkill(pid, ppid,
599         host = None, 
600         port = None, 
601         user = None, 
602         gwuser = None,
603         gw = None,
604         agent = None, 
605         sudo = False,
606         identity = None, 
607         server_key = None, 
608         nowait = False,
609         strict_host_checking = True):
610     """
611     Sends a kill signal to a remote process.
612
613     First tries a SIGTERM, and if the process does not end in 10 seconds,
614     it sends a SIGKILL.
615  
616         :param pid: Process id of process to be killed
617         :type pid: int
618
619         :param ppid: Parent process id of process to be killed
620         :type ppid: int
621
622         :param sudo: Flag indicating if sudo should be used to kill the process
623         :type sudo: bool
624         
625     """
626     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
627     cmd = """
628 SUBKILL="%(subkill)s" ;
629 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
630 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
631 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
632     sleep 0.2 
633     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
634         break
635     else
636         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
637         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
638     fi
639     sleep 1.8
640 done
641 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
642     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
643     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
644 fi
645 """
646     if nowait:
647         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
648
649     (out,err),proc = rexec(
650         cmd % {
651             'ppid' : ppid,
652             'pid' : pid,
653             'sudo' : 'sudo -S' if sudo else '',
654             'subkill' : subkill,
655         },
656         host = host,
657         port = port,
658         user = user,
659         gwuser = gwuser,
660         gw = gw,
661         agent = agent,
662         identity = identity,
663         server_key = server_key,
664         strict_host_checking = strict_host_checking
665         )
666     
667     # wait, don't leave zombies around
668     proc.wait()
669
670     return (out, err), proc
671
672 def _retry_rexec(args,
673         log_msg,
674         stdout = subprocess.PIPE,
675         stdin = subprocess.PIPE, 
676         stderr = subprocess.PIPE,
677         env = None,
678         retry = 3,
679         tmp_known_hosts = None,
680         blocking = True):
681
682     for x in xrange(retry):
683         # connects to the remote host and starts a remote connection
684         proc = subprocess.Popen(args,
685                 env = env,
686                 stdout = stdout,
687                 stdin = stdin, 
688                 stderr = stderr)
689         
690         # attach tempfile object to the process, to make sure the file stays
691         # alive until the process is finished with it
692         proc._known_hosts = tmp_known_hosts
693     
694         # The argument block == False forces to rexec to return immediately, 
695         # without blocking 
696         try:
697             err = out = " "
698             if blocking:
699                 #(out, err) = proc.communicate()
700                 # The method communicate was re implemented for performance issues
701                 # when using python subprocess communicate method the ssh commands 
702                 # last one minute each
703                 out, err = _communicate(proc, input=None)
704
705             elif stdout:
706                 out = proc.stdout.read()
707                 if proc.poll() and stderr:
708                     err = proc.stderr.read()
709
710             log(log_msg, logging.DEBUG, out, err)
711
712             if proc.poll():
713                 skip = False
714
715                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
716                     # SSH error, can safely retry
717                     skip = True 
718                 elif retry:
719                     # Probably timed out or plain failed but can retry
720                     skip = True 
721                 
722                 if skip:
723                     t = x*2
724                     msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
725                             t, x, " ".join(args))
726                     log(msg, logging.DEBUG)
727
728                     time.sleep(t)
729                     continue
730             break
731         except RuntimeError, e:
732             msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
733             log(msg, logging.DEBUG, out, err)
734
735             if retry <= 0:
736                 raise
737             retry -= 1
738
739     return ((out, err), proc)
740
741 # POSIX
742 # Don't remove. The method communicate was re implemented for performance issues
743 def _communicate(proc, input, timeout=None, err_on_timeout=True):
744     read_set = []
745     write_set = []
746     stdout = None # Return
747     stderr = None # Return
748
749     killed = False
750
751     if timeout is not None:
752         timelimit = time.time() + timeout
753         killtime = timelimit + 4
754         bailtime = timelimit + 4
755
756     if proc.stdin:
757         # Flush stdio buffer.  This might block, if the user has
758         # been writing to .stdin in an uncontrolled fashion.
759         proc.stdin.flush()
760         if input:
761             write_set.append(proc.stdin)
762         else:
763             proc.stdin.close()
764
765     if proc.stdout:
766         read_set.append(proc.stdout)
767         stdout = []
768
769     if proc.stderr:
770         read_set.append(proc.stderr)
771         stderr = []
772
773     input_offset = 0
774     while read_set or write_set:
775         if timeout is not None:
776             curtime = time.time()
777             if timeout is None or curtime > timelimit:
778                 if curtime > bailtime:
779                     break
780                 elif curtime > killtime:
781                     signum = signal.SIGKILL
782                 else:
783                     signum = signal.SIGTERM
784                 # Lets kill it
785                 os.kill(proc.pid, signum)
786                 select_timeout = 0.5
787             else:
788                 select_timeout = timelimit - curtime + 0.1
789         else:
790             select_timeout = 1.0
791
792         if select_timeout > 1.0:
793             select_timeout = 1.0
794
795         try:
796             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
797         except select.error,e:
798             if e[0] != 4:
799                 raise
800             else:
801                 continue
802
803         if not rlist and not wlist and not xlist and proc.poll() is not None:
804             # timeout and process exited, say bye
805             break
806
807         if proc.stdin in wlist:
808             # When select has indicated that the file is writable,
809             # we can write up to PIPE_BUF bytes without risk
810             # blocking.  POSIX defines PIPE_BUF >= 512
811             bytes_written = os.write(proc.stdin.fileno(),
812                     buffer(input, input_offset, 512))
813             input_offset += bytes_written
814
815             if input_offset >= len(input):
816                 proc.stdin.close()
817                 write_set.remove(proc.stdin)
818
819         if proc.stdout in rlist:
820             data = os.read(proc.stdout.fileno(), 1024)
821             if data == "":
822                 proc.stdout.close()
823                 read_set.remove(proc.stdout)
824             stdout.append(data)
825
826         if proc.stderr in rlist:
827             data = os.read(proc.stderr.fileno(), 1024)
828             if data == "":
829                 proc.stderr.close()
830                 read_set.remove(proc.stderr)
831             stderr.append(data)
832
833     # All data exchanged.  Translate lists into strings.
834     if stdout is not None:
835         stdout = ''.join(stdout)
836     if stderr is not None:
837         stderr = ''.join(stderr)
838
839     # Translate newlines, if requested.  We cannot let the file
840     # object do the translation: It is based on stdio, which is
841     # impossible to combine with select (unless forcing no
842     # buffering).
843     if proc.universal_newlines and hasattr(file, 'newlines'):
844         if stdout:
845             stdout = proc._translate_newlines(stdout)
846         if stderr:
847             stderr = proc._translate_newlines(stderr)
848
849     if killed and err_on_timeout:
850         errcode = proc.poll()
851         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
852     else:
853         if killed:
854             proc.poll()
855         else:
856             proc.wait()
857         return (stdout, stderr)
858
859