Fix #26 [NEPI] [BUG] SSH identity doesn't expand '~'
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 import base64
22 import errno
23 import hashlib
24 import logging
25 import os
26 import os.path
27 import re
28 import select
29 import signal
30 import socket
31 import subprocess
32 import threading
33 import time
34 import tempfile
35
36 logger = logging.getLogger("sshfuncs")
37
38 def log(msg, level, out = None, err = None):
39     if out:
40         msg += " - OUT: %s " % out
41
42     if err:
43         msg += " - ERROR: %s " % err
44
45     logger.log(level, msg)
46
47
48 if hasattr(os, "devnull"):
49     DEV_NULL = os.devnull
50 else:
51     DEV_NULL = "/dev/null"
52
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
54
55 class STDOUT: 
56     """
57     Special value that when given to rspawn in stderr causes stderr to 
58     redirect to whatever stdout was redirected to.
59     """
60
61 class ProcStatus:
62     """
63     Codes for status of remote spawned process
64     """
65     # Process is still running
66     RUNNING = 1
67
68     # Process is finished
69     FINISHED = 2
70     
71     # Process hasn't started running yet (this should be very rare)
72     NOT_STARTED = 3
73
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
76
77 def gethostbyname(host):
78     global hostbyname_cache
79     global hostbyname_cache_lock
80     
81     hostbyname = hostbyname_cache.get(host)
82     if not hostbyname:
83         with hostbyname_cache_lock:
84             hostbyname = socket.gethostbyname(host)
85             hostbyname_cache[host] = hostbyname
86
87             msg = " Added hostbyname %s - %s " % (host, hostbyname)
88             log(msg, logging.DEBUG)
89
90     return hostbyname
91
92 OPENSSH_HAS_PERSIST = None
93
94 def openssh_has_persist():
95     """ The ssh_config options ControlMaster and ControlPersist allow to
96     reuse a same network connection for multiple ssh sessions. In this 
97     way limitations on number of open ssh connections can be bypassed.
98     However, older versions of openSSH do not support this feature.
99     This function is used to determine if ssh connection persist features
100     can be used.
101     """
102     global OPENSSH_HAS_PERSIST
103     if OPENSSH_HAS_PERSIST is None:
104         proc = subprocess.Popen(["ssh","-v"],
105             stdout = subprocess.PIPE,
106             stderr = subprocess.STDOUT,
107             stdin = open("/dev/null","r") )
108         out,err = proc.communicate()
109         proc.wait()
110         
111         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112         OPENSSH_HAS_PERSIST = bool(vre.match(out))
113     return OPENSSH_HAS_PERSIST
114
115 def make_server_key_args(server_key, host, port):
116     """ Returns a reference to a temporary known_hosts file, to which 
117     the server key has been added. 
118     
119     Make sure to hold onto the temp file reference until the process is 
120     done with it
121
122     :param server_key: the server public key
123     :type server_key: str
124
125     :param host: the hostname
126     :type host: str
127
128     :param port: the ssh port
129     :type port: str
130
131     """
132     if port is not None:
133         host = '%s:%s' % (host, str(port))
134
135     # Create a temporary server key file
136     tmp_known_hosts = tempfile.NamedTemporaryFile()
137    
138     hostbyname = gethostbyname(host) 
139
140     # Add the intended host key
141     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
142     
143     # If we're not in strict mode, add user-configured keys
144     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146         if os.access(user_hosts_path, os.R_OK):
147             f = open(user_hosts_path, "r")
148             tmp_known_hosts.write(f.read())
149             f.close()
150         
151     tmp_known_hosts.flush()
152     
153     return tmp_known_hosts
154
155 def make_control_path(agent, forward_x11):
156     ctrl_path = "/tmp/nepi_ssh"
157
158     if agent:
159         ctrl_path +="_a"
160
161     if forward_x11:
162         ctrl_path +="_x"
163
164     ctrl_path += "-%r@%h:%p"
165
166     return ctrl_path
167
168 def shell_escape(s):
169     """ Escapes strings so that they are safe to use as command-line 
170     arguments """
171     if SHELL_SAFE.match(s):
172         # safe string - no escaping needed
173         return s
174     else:
175         # unsafe string - escape
176         def escp(c):
177             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
178                 return c
179             else:
180                 return "'$'\\x%02x''" % (ord(c),)
181         s = ''.join(map(escp,s))
182         return "'%s'" % (s,)
183
184 def eintr_retry(func):
185     """Retries a function invocation when a EINTR occurs"""
186     import functools
187     @functools.wraps(func)
188     def rv(*p, **kw):
189         retry = kw.pop("_retry", False)
190         for i in xrange(0 if retry else 4):
191             try:
192                 return func(*p, **kw)
193             except (select.error, socket.error), args:
194                 if args[0] == errno.EINTR:
195                     continue
196                 else:
197                     raise 
198             except OSError, e:
199                 if e.errno == errno.EINTR:
200                     continue
201                 else:
202                     raise
203         else:
204             return func(*p, **kw)
205     return rv
206
207 def rexec(command, host, user, 
208         port = None,
209         gwuser = None,
210         gw = None, 
211         agent = True,
212         sudo = False,
213         stdin = None,
214         identity = None,
215         server_key = None,
216         env = None,
217         tty = False,
218         timeout = None,
219         retry = 3,
220         err_on_timeout = True,
221         connect_timeout = 30,
222         persistent = True,
223         forward_x11 = False,
224         blocking = True,
225         strict_host_checking = True):
226     """
227     Executes a remote command, returns ((stdout,stderr),process)
228     """
229     
230     tmp_known_hosts = None
231     if not gw:
232         hostip = gethostbyname(host)
233     else: hostip = None
234
235     args = ['ssh', '-C',
236             # Don't bother with localhost. Makes test easier
237             '-o', 'NoHostAuthenticationForLocalhost=yes',
238             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
239             '-o', 'ConnectionAttempts=3',
240             '-o', 'ServerAliveInterval=30',
241             '-o', 'TCPKeepAlive=yes',
242             '-o', 'Batchmode=yes',
243             '-l', user, hostip or host]
244
245     if persistent and openssh_has_persist():
246         args.extend([
247             '-o', 'ControlMaster=auto',
248             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
249             '-o', 'ControlPersist=60' ])
250
251     if not strict_host_checking:
252         # Do not check for Host key. Unsafe.
253         args.extend(['-o', 'StrictHostKeyChecking=no'])
254
255     if gw:
256         if gwuser:
257             proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
258         else:
259             proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
260         args.extend(['-o', proxycommand])
261
262     if agent:
263         args.append('-A')
264
265     if port:
266         args.append('-p%d' % port)
267
268     if identity:
269         identity = os.path.expanduser(identity)
270         args.extend(('-i', identity))
271
272     if tty:
273         args.append('-t')
274         args.append('-t')
275
276     if forward_x11:
277         args.append('-X')
278
279     if server_key:
280         # Create a temporary server key file
281         tmp_known_hosts = make_server_key_args(server_key, host, port)
282         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
283
284     if sudo:
285         command = "sudo " + command
286
287     args.append(command)
288
289     for x in xrange(retry):
290         # connects to the remote host and starts a remote connection
291         proc = subprocess.Popen(args,
292                 env = env,
293                 stdout = subprocess.PIPE,
294                 stdin = subprocess.PIPE, 
295                 stderr = subprocess.PIPE)
296        
297         # attach tempfile object to the process, to make sure the file stays
298         # alive until the process is finished with it
299         proc._known_hosts = tmp_known_hosts
300     
301         # by default, rexec calls _communicate which will block 
302         # until the process has exit. The argument block == False 
303         # forces to rexec to return immediately, without blocking 
304         try:
305             if blocking:
306                 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
307             else:
308                 err = proc.stderr.read()
309                 out = proc.stdout.read()
310
311             msg = " rexec - host %s - command %s " % (host, " ".join(args))
312             log(msg, logging.DEBUG, out, err)
313
314             if proc.poll():
315                 skip = False
316
317                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
318                     # SSH error, can safely retry
319                     skip = True 
320                 elif retry:
321                     # Probably timed out or plain failed but can retry
322                     skip = True 
323                 
324                 if skip:
325                     t = x*2
326                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
327                             t, x, host, " ".join(args))
328                     log(msg, logging.DEBUG)
329
330                     time.sleep(t)
331                     continue
332             break
333         except RuntimeError, e:
334             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
335             log(msg, logging.DEBUG, out, err)
336
337             if retry <= 0:
338                 raise
339             retry -= 1
340         
341     return ((out, err), proc)
342
343 def rcopy(source, dest,
344         port = None,
345         gwuser = None,
346         gw = None,
347         agent = True, 
348         recursive = False,
349         identity = None,
350         server_key = None,
351         retry = 3,
352         strict_host_checking = True):
353     """
354     Copies from/to remote sites.
355     
356     Source and destination should have the user and host encoded
357     as per scp specs.
358     
359     Source can be a list of files to copy to a single destination, 
360     (in which case it is advised that the destination be a folder),
361     a single file in a string or a semi-colon separated list of files
362     in a string.
363     """
364
365     # Parse destination as <user>@<server>:<path>
366     if isinstance(dest, str) and ':' in dest:
367         remspec, path = dest.split(':',1)
368     elif isinstance(source, str) and ':' in source:
369         remspec, path = source.split(':',1)
370     else:
371         raise ValueError, "Both endpoints cannot be local"
372     user,host = remspec.rsplit('@',1)
373     
374     # plain scp
375     tmp_known_hosts = None
376
377     args = ['scp', '-q', '-p', '-C',
378             # Speed up transfer using blowfish cypher specification which is 
379             # faster than the default one (3des)
380             '-c', 'blowfish',
381             # Don't bother with localhost. Makes test easier
382             '-o', 'NoHostAuthenticationForLocalhost=yes',
383             '-o', 'ConnectTimeout=60',
384             '-o', 'ConnectionAttempts=3',
385             '-o', 'ServerAliveInterval=30',
386             '-o', 'TCPKeepAlive=yes' ]
387             
388     if port:
389         args.append('-P%d' % port)
390
391     if gw:
392         if gwuser:
393             proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
394         else:
395             proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
396         args.extend(['-o', proxycommand])
397
398     if recursive:
399         args.append('-r')
400
401     if identity:
402         identity = os.path.expanduser(identity)
403         args.extend(('-i', identity))
404
405     if server_key:
406         # Create a temporary server key file
407         tmp_known_hosts = make_server_key_args(server_key, host, port)
408         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
409
410     if not strict_host_checking:
411         # Do not check for Host key. Unsafe.
412         args.extend(['-o', 'StrictHostKeyChecking=no'])
413
414     if openssh_has_persist():
415         args.extend([
416             '-o', 'ControlMaster=auto',
417             '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
418             ])
419
420     if isinstance(dest, str):
421         dest = map(str.strip, dest.split(";"))
422
423     if isinstance(source, str):
424         source = map(str.strip, source.split(";"))
425
426     args.extend(source)
427
428     args.extend(dest)
429
430     for x in xrange(retry):
431         # connects to the remote host and starts a remote connection
432         proc = subprocess.Popen(args,
433                 stdout = subprocess.PIPE,
434                 stdin = subprocess.PIPE, 
435                 stderr = subprocess.PIPE)
436         
437         # attach tempfile object to the process, to make sure the file stays
438         # alive until the process is finished with it
439         proc._known_hosts = tmp_known_hosts
440     
441         try:
442             (out, err) = proc.communicate()
443             eintr_retry(proc.wait)()
444             msg = " rcopy - host %s - command %s " % (host, " ".join(args))
445             log(msg, logging.DEBUG, out, err)
446
447             if proc.poll():
448                 t = x*2
449                 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
450                         t, x, host, " ".join(args))
451                 log(msg, logging.DEBUG)
452
453                 time.sleep(t)
454                 continue
455
456             break
457         except RuntimeError, e:
458             msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
459             log(msg, logging.DEBUG, out, err)
460
461             if retry <= 0:
462                 raise
463             retry -= 1
464         
465     return ((out, err), proc)
466
467 def rspawn(command, pidfile, 
468         stdout = '/dev/null', 
469         stderr = STDOUT, 
470         stdin = '/dev/null',
471         home = None, 
472         create_home = False, 
473         sudo = False,
474         host = None, 
475         port = None, 
476         user = None, 
477         gwuser = None,
478         gw = None,
479         agent = None, 
480         identity = None, 
481         server_key = None,
482         tty = False):
483     """
484     Spawn a remote command such that it will continue working asynchronously in 
485     background. 
486
487         :param command: The command to run, it should be a single line.
488         :type command: str
489
490         :param pidfile: Path to a file where to store the pid and ppid of the 
491                         spawned process
492         :type pidfile: str
493
494         :param stdout: Path to file to redirect standard output. 
495                        The default value is /dev/null
496         :type stdout: str
497
498         :param stderr: Path to file to redirect standard error.
499                        If the special STDOUT value is used, stderr will 
500                        be redirected to the same file as stdout
501         :type stderr: str
502
503         :param stdin: Path to a file with input to be piped into the command's standard input
504         :type stdin: str
505
506         :param home: Path to working directory folder. 
507                     It is assumed to exist unless the create_home flag is set.
508         :type home: str
509
510         :param create_home: Flag to force creation of the home folder before 
511                             running the command
512         :type create_home: bool
513  
514         :param sudo: Flag forcing execution with sudo user
515         :type sudo: bool
516         
517         :rtype: touple
518
519         (stdout, stderr), process
520         
521         Of the spawning process, which only captures errors at spawning time.
522         Usually only useful for diagnostics.
523     """
524     # Start process in a "daemonized" way, using nohup and heavy
525     # stdin/out redirection to avoid connection issues
526     if stderr is STDOUT:
527         stderr = '&1'
528     else:
529         stderr = ' ' + stderr
530     
531     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
532         'command' : command,
533         'pidfile' : shell_escape(pidfile),
534         'stdout' : stdout,
535         'stderr' : stderr,
536         'stdin' : stdin,
537     }
538     
539     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
540             'command' : shell_escape(daemon_command),
541             'sudo' : 'sudo -S' if sudo else '',
542             'pidfile' : shell_escape(pidfile),
543             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
544             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
545         }
546
547     (out,err),proc = rexec(
548         cmd,
549         host = host,
550         port = port,
551         user = user,
552         gwuser = gwuser,
553         gw = gw,
554         agent = agent,
555         identity = identity,
556         server_key = server_key,
557         tty = tty ,
558         )
559     
560     if proc.wait():
561         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
562
563     return ((out, err), proc)
564
565 @eintr_retry
566 def rgetpid(pidfile,
567         host = None, 
568         port = None, 
569         user = None, 
570         gwuser = None,
571         gw = None,
572         agent = None, 
573         identity = None,
574         server_key = None):
575     """
576     Returns the pid and ppid of a process from a remote file where the 
577     information was stored.
578
579         :param home: Path to directory where the pidfile is located
580         :type home: str
581
582         :param pidfile: Name of file containing the pid information
583         :type pidfile: str
584         
585         :rtype: int
586         
587         A (pid, ppid) tuple useful for calling rstatus and rkill,
588         or None if the pidfile isn't valid yet (can happen when process is staring up)
589
590     """
591     (out,err),proc = rexec(
592         "cat %(pidfile)s" % {
593             'pidfile' : pidfile,
594         },
595         host = host,
596         port = port,
597         user = user,
598         gwuser = gwuser,
599         gw = gw,
600         agent = agent,
601         identity = identity,
602         server_key = server_key
603         )
604         
605     if proc.wait():
606         return None
607     
608     if out:
609         try:
610             return map(int,out.strip().split(' ',1))
611         except:
612             # Ignore, many ways to fail that don't matter that much
613             return None
614
615 @eintr_retry
616 def rstatus(pid, ppid, 
617         host = None, 
618         port = None, 
619         user = None, 
620         gwuser = None,
621         gw = None,
622         agent = None, 
623         identity = None,
624         server_key = None):
625     """
626     Returns a code representing the the status of a remote process
627
628         :param pid: Process id of the process
629         :type pid: int
630
631         :param ppid: Parent process id of process
632         :type ppid: int
633     
634         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
635     
636     """
637     (out,err),proc = rexec(
638         # Check only by pid. pid+ppid does not always work (especially with sudo) 
639         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
640             'ppid' : ppid,
641             'pid' : pid,
642         },
643         host = host,
644         port = port,
645         user = user,
646         gwuser = gwuser,
647         gw = gw,
648         agent = agent,
649         identity = identity,
650         server_key = server_key
651         )
652     
653     if proc.wait():
654         return ProcStatus.NOT_STARTED
655     
656     status = False
657     if err:
658         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
659             status = True
660     elif out:
661         status = (out.strip() == 'wait')
662     else:
663         return ProcStatus.NOT_STARTED
664     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
665
666 @eintr_retry
667 def rkill(pid, ppid,
668         host = None, 
669         port = None, 
670         user = None, 
671         gwuser = None,
672         gw = None,
673         agent = None, 
674         sudo = False,
675         identity = None, 
676         server_key = None, 
677         nowait = False):
678     """
679     Sends a kill signal to a remote process.
680
681     First tries a SIGTERM, and if the process does not end in 10 seconds,
682     it sends a SIGKILL.
683  
684         :param pid: Process id of process to be killed
685         :type pid: int
686
687         :param ppid: Parent process id of process to be killed
688         :type ppid: int
689
690         :param sudo: Flag indicating if sudo should be used to kill the process
691         :type sudo: bool
692         
693     """
694     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
695     cmd = """
696 SUBKILL="%(subkill)s" ;
697 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
698 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
699 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
700     sleep 0.2 
701     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
702         break
703     else
704         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
705         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
706     fi
707     sleep 1.8
708 done
709 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
710     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
711     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
712 fi
713 """
714     if nowait:
715         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
716
717     (out,err),proc = rexec(
718         cmd % {
719             'ppid' : ppid,
720             'pid' : pid,
721             'sudo' : 'sudo -S' if sudo else '',
722             'subkill' : subkill,
723         },
724         host = host,
725         port = port,
726         user = user,
727         gwuser = gwuser,
728         gw = gw,
729         agent = agent,
730         identity = identity,
731         server_key = server_key
732         )
733     
734     # wait, don't leave zombies around
735     proc.wait()
736
737     return (out, err), proc
738
739 # POSIX
740 def _communicate(proc, input, timeout=None, err_on_timeout=True):
741     read_set = []
742     write_set = []
743     stdout = None # Return
744     stderr = None # Return
745     
746     killed = False
747     
748     if timeout is not None:
749         timelimit = time.time() + timeout
750         killtime = timelimit + 4
751         bailtime = timelimit + 4
752
753     if proc.stdin:
754         # Flush stdio buffer.  This might block, if the user has
755         # been writing to .stdin in an uncontrolled fashion.
756         proc.stdin.flush()
757         if input:
758             write_set.append(proc.stdin)
759         else:
760             proc.stdin.close()
761
762     if proc.stdout:
763         read_set.append(proc.stdout)
764         stdout = []
765
766     if proc.stderr:
767         read_set.append(proc.stderr)
768         stderr = []
769
770     input_offset = 0
771     while read_set or write_set:
772         if timeout is not None:
773             curtime = time.time()
774             if timeout is None or curtime > timelimit:
775                 if curtime > bailtime:
776                     break
777                 elif curtime > killtime:
778                     signum = signal.SIGKILL
779                 else:
780                     signum = signal.SIGTERM
781                 # Lets kill it
782                 os.kill(proc.pid, signum)
783                 select_timeout = 0.5
784             else:
785                 select_timeout = timelimit - curtime + 0.1
786         else:
787             select_timeout = 1.0
788         
789         if select_timeout > 1.0:
790             select_timeout = 1.0
791             
792         try:
793             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
794         except select.error,e:
795             if e[0] != 4:
796                 raise
797             else:
798                 continue
799         
800         if not rlist and not wlist and not xlist and proc.poll() is not None:
801             # timeout and process exited, say bye
802             break
803
804         if proc.stdin in wlist:
805             # When select has indicated that the file is writable,
806             # we can write up to PIPE_BUF bytes without risk
807             # blocking.  POSIX defines PIPE_BUF >= 512
808             bytes_written = os.write(proc.stdin.fileno(),
809                     buffer(input, input_offset, 512))
810             input_offset += bytes_written
811
812             if input_offset >= len(input):
813                 proc.stdin.close()
814                 write_set.remove(proc.stdin)
815
816         if proc.stdout in rlist:
817             data = os.read(proc.stdout.fileno(), 1024)
818             if data == "":
819                 proc.stdout.close()
820                 read_set.remove(proc.stdout)
821             stdout.append(data)
822
823         if proc.stderr in rlist:
824             data = os.read(proc.stderr.fileno(), 1024)
825             if data == "":
826                 proc.stderr.close()
827                 read_set.remove(proc.stderr)
828             stderr.append(data)
829     
830     # All data exchanged.  Translate lists into strings.
831     if stdout is not None:
832         stdout = ''.join(stdout)
833     if stderr is not None:
834         stderr = ''.join(stderr)
835
836     # Translate newlines, if requested.  We cannot let the file
837     # object do the translation: It is based on stdio, which is
838     # impossible to combine with select (unless forcing no
839     # buffering).
840     if proc.universal_newlines and hasattr(file, 'newlines'):
841         if stdout:
842             stdout = proc._translate_newlines(stdout)
843         if stderr:
844             stderr = proc._translate_newlines(stderr)
845
846     if killed and err_on_timeout:
847         errcode = proc.poll()
848         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
849     else:
850         if killed:
851             proc.poll()
852         else:
853             proc.wait()
854         return (stdout, stderr)
855