Fixing typo in comments
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 import base64
22 import errno
23 import hashlib
24 import logging
25 import os
26 import os.path
27 import re
28 import select
29 import signal
30 import socket
31 import subprocess
32 import threading
33 import time
34 import tempfile
35
36 logger = logging.getLogger("sshfuncs")
37
38 def log(msg, level, out = None, err = None):
39     if out:
40         msg += " - OUT: %s " % out
41
42     if err:
43         msg += " - ERROR: %s " % err
44
45     logger.log(level, msg)
46
47
48 if hasattr(os, "devnull"):
49     DEV_NULL = os.devnull
50 else:
51     DEV_NULL = "/dev/null"
52
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
54
55 class STDOUT: 
56     """
57     Special value that when given to rspawn in stderr causes stderr to 
58     redirect to whatever stdout was redirected to.
59     """
60
61 class ProcStatus:
62     """
63     Codes for status of remote spawned process
64     """
65     # Process is still running
66     RUNNING = 1
67
68     # Process is finished
69     FINISHED = 2
70     
71     # Process hasn't started running yet (this should be very rare)
72     NOT_STARTED = 3
73
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
76
77 def gethostbyname(host):
78     global hostbyname_cache
79     global hostbyname_cache_lock
80     
81     hostbyname = hostbyname_cache.get(host)
82     if not hostbyname:
83         with hostbyname_cache_lock:
84             hostbyname = socket.gethostbyname(host)
85             hostbyname_cache[host] = hostbyname
86
87             msg = " Added hostbyname %s - %s " % (host, hostbyname)
88             log(msg, logging.DEBUG)
89
90     return hostbyname
91
92 OPENSSH_HAS_PERSIST = None
93
94 def openssh_has_persist():
95     """ The ssh_config options ControlMaster and ControlPersist allow to
96     reuse a same network connection for multiple ssh sessions. In this 
97     way limitations on number of open ssh connections can be bypassed.
98     However, older versions of openSSH do not support this feature.
99     This function is used to determine if ssh connection persist features
100     can be used.
101     """
102     global OPENSSH_HAS_PERSIST
103     if OPENSSH_HAS_PERSIST is None:
104         proc = subprocess.Popen(["ssh","-v"],
105             stdout = subprocess.PIPE,
106             stderr = subprocess.STDOUT,
107             stdin = open("/dev/null","r") )
108         out,err = proc.communicate()
109         proc.wait()
110         
111         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112         OPENSSH_HAS_PERSIST = bool(vre.match(out))
113     return OPENSSH_HAS_PERSIST
114
115 def make_server_key_args(server_key, host, port):
116     """ Returns a reference to a temporary known_hosts file, to which 
117     the server key has been added. 
118     
119     Make sure to hold onto the temp file reference until the process is 
120     done with it
121
122     :param server_key: the server public key
123     :type server_key: str
124
125     :param host: the hostname
126     :type host: str
127
128     :param port: the ssh port
129     :type port: str
130
131     """
132     if port is not None:
133         host = '%s:%s' % (host, str(port))
134
135     # Create a temporary server key file
136     tmp_known_hosts = tempfile.NamedTemporaryFile()
137    
138     hostbyname = gethostbyname(host) 
139
140     # Add the intended host key
141     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
142     
143     # If we're not in strict mode, add user-configured keys
144     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146         if os.access(user_hosts_path, os.R_OK):
147             f = open(user_hosts_path, "r")
148             tmp_known_hosts.write(f.read())
149             f.close()
150         
151     tmp_known_hosts.flush()
152     
153     return tmp_known_hosts
154
155 def make_control_path(agent, forward_x11):
156     ctrl_path = "/tmp/nepi_ssh"
157
158     if agent:
159         ctrl_path +="_a"
160
161     if forward_x11:
162         ctrl_path +="_x"
163
164     ctrl_path += "-%r@%h:%p"
165
166     return ctrl_path
167
168 def shell_escape(s):
169     """ Escapes strings so that they are safe to use as command-line 
170     arguments """
171     if SHELL_SAFE.match(s):
172         # safe string - no escaping needed
173         return s
174     else:
175         # unsafe string - escape
176         def escp(c):
177             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
178                 return c
179             else:
180                 return "'$'\\x%02x''" % (ord(c),)
181         s = ''.join(map(escp,s))
182         return "'%s'" % (s,)
183
184 def eintr_retry(func):
185     """Retries a function invocation when a EINTR occurs"""
186     import functools
187     @functools.wraps(func)
188     def rv(*p, **kw):
189         retry = kw.pop("_retry", False)
190         for i in xrange(0 if retry else 4):
191             try:
192                 return func(*p, **kw)
193             except (select.error, socket.error), args:
194                 if args[0] == errno.EINTR:
195                     continue
196                 else:
197                     raise 
198             except OSError, e:
199                 if e.errno == errno.EINTR:
200                     continue
201                 else:
202                     raise
203         else:
204             return func(*p, **kw)
205     return rv
206
207 def rexec(command, host, user, 
208         port = None,
209         gwuser = None,
210         gw = None, 
211         agent = True,
212         sudo = False,
213         stdin = None,
214         identity = None,
215         server_key = None,
216         env = None,
217         tty = False,
218         timeout = None,
219         retry = 3,
220         err_on_timeout = True,
221         connect_timeout = 30,
222         persistent = True,
223         forward_x11 = False,
224         blocking = True,
225         strict_host_checking = True):
226     """
227     Executes a remote command, returns ((stdout,stderr),process)
228     """
229     
230     tmp_known_hosts = None
231     if not gw:
232         hostip = gethostbyname(host)
233     else: hostip = None
234
235     args = ['ssh', '-C',
236             # Don't bother with localhost. Makes test easier
237             '-o', 'NoHostAuthenticationForLocalhost=yes',
238             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
239             '-o', 'ConnectionAttempts=3',
240             '-o', 'ServerAliveInterval=30',
241             '-o', 'TCPKeepAlive=yes',
242             '-o', 'Batchmode=yes',
243             '-l', user, hostip or host]
244
245     if persistent and openssh_has_persist():
246         args.extend([
247             '-o', 'ControlMaster=auto',
248             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
249             '-o', 'ControlPersist=60' ])
250
251     if not strict_host_checking:
252         # Do not check for Host key. Unsafe.
253         args.extend(['-o', 'StrictHostKeyChecking=no'])
254
255     if gw:
256         if gwuser:
257             proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
258         else:
259             proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
260         args.extend(['-o', proxycommand])
261
262     if agent:
263         args.append('-A')
264
265     if port:
266         args.append('-p%d' % port)
267
268     if identity:
269         args.extend(('-i', identity))
270
271     if tty:
272         args.append('-t')
273         args.append('-t')
274
275     if forward_x11:
276         args.append('-X')
277
278     if server_key:
279         # Create a temporary server key file
280         tmp_known_hosts = make_server_key_args(server_key, host, port)
281         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
282
283     if sudo:
284         command = "sudo " + command
285
286     args.append(command)
287
288     for x in xrange(retry):
289         # connects to the remote host and starts a remote connection
290         proc = subprocess.Popen(args,
291                 env = env,
292                 stdout = subprocess.PIPE,
293                 stdin = subprocess.PIPE, 
294                 stderr = subprocess.PIPE)
295        
296         # attach tempfile object to the process, to make sure the file stays
297         # alive until the process is finished with it
298         proc._known_hosts = tmp_known_hosts
299     
300         # by default, rexec calls _communicate which will block 
301         # until the process has exit. The argument block == False 
302         # forces to rexec to return immediately, without blocking 
303         try:
304             if blocking:
305                 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
306             else:
307                 err = proc.stderr.read()
308                 out = proc.stdout.read()
309
310             msg = " rexec - host %s - command %s " % (host, " ".join(args))
311             log(msg, logging.DEBUG, out, err)
312
313             if proc.poll():
314                 skip = False
315
316                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
317                     # SSH error, can safely retry
318                     skip = True 
319                 elif retry:
320                     # Probably timed out or plain failed but can retry
321                     skip = True 
322                 
323                 if skip:
324                     t = x*2
325                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
326                             t, x, host, " ".join(args))
327                     log(msg, logging.DEBUG)
328
329                     time.sleep(t)
330                     continue
331             break
332         except RuntimeError, e:
333             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
334             log(msg, logging.DEBUG, out, err)
335
336             if retry <= 0:
337                 raise
338             retry -= 1
339         
340     return ((out, err), proc)
341
342 def rcopy(source, dest,
343         port = None,
344         gwuser = None,
345         gw = None,
346         agent = True, 
347         recursive = False,
348         identity = None,
349         server_key = None,
350         retry = 3,
351         strict_host_checking = True):
352     """
353     Copies from/to remote sites.
354     
355     Source and destination should have the user and host encoded
356     as per scp specs.
357     
358     Source can be a list of files to copy to a single destination, 
359     (in which case it is advised that the destination be a folder),
360     a single file in a string or a semi-colon separated list of files
361     in a string.
362     """
363
364     # Parse destination as <user>@<server>:<path>
365     if isinstance(dest, str) and ':' in dest:
366         remspec, path = dest.split(':',1)
367     elif isinstance(source, str) and ':' in source:
368         remspec, path = source.split(':',1)
369     else:
370         raise ValueError, "Both endpoints cannot be local"
371     user,host = remspec.rsplit('@',1)
372     
373     # plain scp
374     tmp_known_hosts = None
375
376     args = ['scp', '-q', '-p', '-C',
377             # Speed up transfer using blowfish cypher specification which is 
378             # faster than the default one (3des)
379             '-c', 'blowfish',
380             # Don't bother with localhost. Makes test easier
381             '-o', 'NoHostAuthenticationForLocalhost=yes',
382             '-o', 'ConnectTimeout=60',
383             '-o', 'ConnectionAttempts=3',
384             '-o', 'ServerAliveInterval=30',
385             '-o', 'TCPKeepAlive=yes' ]
386             
387     if port:
388         args.append('-P%d' % port)
389
390     if gw:
391         if gwuser:
392             proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
393         else:
394             proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
395         args.extend(['-o', proxycommand])
396
397     if recursive:
398         args.append('-r')
399
400     if identity:
401         args.extend(('-i', identity))
402
403     if server_key:
404         # Create a temporary server key file
405         tmp_known_hosts = make_server_key_args(server_key, host, port)
406         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
407
408     if not strict_host_checking:
409         # Do not check for Host key. Unsafe.
410         args.extend(['-o', 'StrictHostKeyChecking=no'])
411
412     if openssh_has_persist():
413         args.extend([
414             '-o', 'ControlMaster=auto',
415             '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
416             ])
417
418     if isinstance(dest, str):
419         dest = map(str.strip, dest.split(";"))
420
421     if isinstance(source, str):
422         source = map(str.strip, source.split(";"))
423
424     args.extend(source)
425
426     args.extend(dest)
427
428     for x in xrange(retry):
429         # connects to the remote host and starts a remote connection
430         proc = subprocess.Popen(args,
431                 stdout = subprocess.PIPE,
432                 stdin = subprocess.PIPE, 
433                 stderr = subprocess.PIPE)
434         
435         # attach tempfile object to the process, to make sure the file stays
436         # alive until the process is finished with it
437         proc._known_hosts = tmp_known_hosts
438     
439         try:
440             (out, err) = proc.communicate()
441             eintr_retry(proc.wait)()
442             msg = " rcopy - host %s - command %s " % (host, " ".join(args))
443             log(msg, logging.DEBUG, out, err)
444
445             if proc.poll():
446                 t = x*2
447                 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
448                         t, x, host, " ".join(args))
449                 log(msg, logging.DEBUG)
450
451                 time.sleep(t)
452                 continue
453
454             break
455         except RuntimeError, e:
456             msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
457             log(msg, logging.DEBUG, out, err)
458
459             if retry <= 0:
460                 raise
461             retry -= 1
462         
463     return ((out, err), proc)
464
465 def rspawn(command, pidfile, 
466         stdout = '/dev/null', 
467         stderr = STDOUT, 
468         stdin = '/dev/null',
469         home = None, 
470         create_home = False, 
471         sudo = False,
472         host = None, 
473         port = None, 
474         user = None, 
475         gwuser = None,
476         gw = None,
477         agent = None, 
478         identity = None, 
479         server_key = None,
480         tty = False):
481     """
482     Spawn a remote command such that it will continue working asynchronously in 
483     background. 
484
485         :param command: The command to run, it should be a single line.
486         :type command: str
487
488         :param pidfile: Path to a file where to store the pid and ppid of the 
489                         spawned process
490         :type pidfile: str
491
492         :param stdout: Path to file to redirect standard output. 
493                        The default value is /dev/null
494         :type stdout: str
495
496         :param stderr: Path to file to redirect standard error.
497                        If the special STDOUT value is used, stderr will 
498                        be redirected to the same file as stdout
499         :type stderr: str
500
501         :param stdin: Path to a file with input to be piped into the command's standard input
502         :type stdin: str
503
504         :param home: Path to working directory folder. 
505                     It is assumed to exist unless the create_home flag is set.
506         :type home: str
507
508         :param create_home: Flag to force creation of the home folder before 
509                             running the command
510         :type create_home: bool
511  
512         :param sudo: Flag forcing execution with sudo user
513         :type sudo: bool
514         
515         :rtype: touple
516
517         (stdout, stderr), process
518         
519         Of the spawning process, which only captures errors at spawning time.
520         Usually only useful for diagnostics.
521     """
522     # Start process in a "daemonized" way, using nohup and heavy
523     # stdin/out redirection to avoid connection issues
524     if stderr is STDOUT:
525         stderr = '&1'
526     else:
527         stderr = ' ' + stderr
528     
529     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
530         'command' : command,
531         'pidfile' : shell_escape(pidfile),
532         'stdout' : stdout,
533         'stderr' : stderr,
534         'stdin' : stdin,
535     }
536     
537     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
538             'command' : shell_escape(daemon_command),
539             'sudo' : 'sudo -S' if sudo else '',
540             'pidfile' : shell_escape(pidfile),
541             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
542             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
543         }
544
545     (out,err),proc = rexec(
546         cmd,
547         host = host,
548         port = port,
549         user = user,
550         gwuser = gwuser,
551         gw = gw,
552         agent = agent,
553         identity = identity,
554         server_key = server_key,
555         tty = tty ,
556         )
557     
558     if proc.wait():
559         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
560
561     return ((out, err), proc)
562
563 @eintr_retry
564 def rgetpid(pidfile,
565         host = None, 
566         port = None, 
567         user = None, 
568         gwuser = None,
569         gw = None,
570         agent = None, 
571         identity = None,
572         server_key = None):
573     """
574     Returns the pid and ppid of a process from a remote file where the 
575     information was stored.
576
577         :param home: Path to directory where the pidfile is located
578         :type home: str
579
580         :param pidfile: Name of file containing the pid information
581         :type pidfile: str
582         
583         :rtype: int
584         
585         A (pid, ppid) tuple useful for calling rstatus and rkill,
586         or None if the pidfile isn't valid yet (can happen when process is staring up)
587
588     """
589     (out,err),proc = rexec(
590         "cat %(pidfile)s" % {
591             'pidfile' : pidfile,
592         },
593         host = host,
594         port = port,
595         user = user,
596         gwuser = gwuser,
597         gw = gw,
598         agent = agent,
599         identity = identity,
600         server_key = server_key
601         )
602         
603     if proc.wait():
604         return None
605     
606     if out:
607         try:
608             return map(int,out.strip().split(' ',1))
609         except:
610             # Ignore, many ways to fail that don't matter that much
611             return None
612
613 @eintr_retry
614 def rstatus(pid, ppid, 
615         host = None, 
616         port = None, 
617         user = None, 
618         gwuser = None,
619         gw = None,
620         agent = None, 
621         identity = None,
622         server_key = None):
623     """
624     Returns a code representing the the status of a remote process
625
626         :param pid: Process id of the process
627         :type pid: int
628
629         :param ppid: Parent process id of process
630         :type ppid: int
631     
632         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
633     
634     """
635     (out,err),proc = rexec(
636         # Check only by pid. pid+ppid does not always work (especially with sudo) 
637         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
638             'ppid' : ppid,
639             'pid' : pid,
640         },
641         host = host,
642         port = port,
643         user = user,
644         gwuser = gwuser,
645         gw = gw,
646         agent = agent,
647         identity = identity,
648         server_key = server_key
649         )
650     
651     if proc.wait():
652         return ProcStatus.NOT_STARTED
653     
654     status = False
655     if err:
656         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
657             status = True
658     elif out:
659         status = (out.strip() == 'wait')
660     else:
661         return ProcStatus.NOT_STARTED
662     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
663
664 @eintr_retry
665 def rkill(pid, ppid,
666         host = None, 
667         port = None, 
668         user = None, 
669         gwuser = None,
670         gw = None,
671         agent = None, 
672         sudo = False,
673         identity = None, 
674         server_key = None, 
675         nowait = False):
676     """
677     Sends a kill signal to a remote process.
678
679     First tries a SIGTERM, and if the process does not end in 10 seconds,
680     it sends a SIGKILL.
681  
682         :param pid: Process id of process to be killed
683         :type pid: int
684
685         :param ppid: Parent process id of process to be killed
686         :type ppid: int
687
688         :param sudo: Flag indicating if sudo should be used to kill the process
689         :type sudo: bool
690         
691     """
692     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
693     cmd = """
694 SUBKILL="%(subkill)s" ;
695 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
696 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
697 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
698     sleep 0.2 
699     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
700         break
701     else
702         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
703         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
704     fi
705     sleep 1.8
706 done
707 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
708     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
709     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
710 fi
711 """
712     if nowait:
713         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
714
715     (out,err),proc = rexec(
716         cmd % {
717             'ppid' : ppid,
718             'pid' : pid,
719             'sudo' : 'sudo -S' if sudo else '',
720             'subkill' : subkill,
721         },
722         host = host,
723         port = port,
724         user = user,
725         gwuser = gwuser,
726         gw = gw,
727         agent = agent,
728         identity = identity,
729         server_key = server_key
730         )
731     
732     # wait, don't leave zombies around
733     proc.wait()
734
735     return (out, err), proc
736
737 # POSIX
738 def _communicate(proc, input, timeout=None, err_on_timeout=True):
739     read_set = []
740     write_set = []
741     stdout = None # Return
742     stderr = None # Return
743     
744     killed = False
745     
746     if timeout is not None:
747         timelimit = time.time() + timeout
748         killtime = timelimit + 4
749         bailtime = timelimit + 4
750
751     if proc.stdin:
752         # Flush stdio buffer.  This might block, if the user has
753         # been writing to .stdin in an uncontrolled fashion.
754         proc.stdin.flush()
755         if input:
756             write_set.append(proc.stdin)
757         else:
758             proc.stdin.close()
759
760     if proc.stdout:
761         read_set.append(proc.stdout)
762         stdout = []
763
764     if proc.stderr:
765         read_set.append(proc.stderr)
766         stderr = []
767
768     input_offset = 0
769     while read_set or write_set:
770         if timeout is not None:
771             curtime = time.time()
772             if timeout is None or curtime > timelimit:
773                 if curtime > bailtime:
774                     break
775                 elif curtime > killtime:
776                     signum = signal.SIGKILL
777                 else:
778                     signum = signal.SIGTERM
779                 # Lets kill it
780                 os.kill(proc.pid, signum)
781                 select_timeout = 0.5
782             else:
783                 select_timeout = timelimit - curtime + 0.1
784         else:
785             select_timeout = 1.0
786         
787         if select_timeout > 1.0:
788             select_timeout = 1.0
789             
790         try:
791             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
792         except select.error,e:
793             if e[0] != 4:
794                 raise
795             else:
796                 continue
797         
798         if not rlist and not wlist and not xlist and proc.poll() is not None:
799             # timeout and process exited, say bye
800             break
801
802         if proc.stdin in wlist:
803             # When select has indicated that the file is writable,
804             # we can write up to PIPE_BUF bytes without risk
805             # blocking.  POSIX defines PIPE_BUF >= 512
806             bytes_written = os.write(proc.stdin.fileno(),
807                     buffer(input, input_offset, 512))
808             input_offset += bytes_written
809
810             if input_offset >= len(input):
811                 proc.stdin.close()
812                 write_set.remove(proc.stdin)
813
814         if proc.stdout in rlist:
815             data = os.read(proc.stdout.fileno(), 1024)
816             if data == "":
817                 proc.stdout.close()
818                 read_set.remove(proc.stdout)
819             stdout.append(data)
820
821         if proc.stderr in rlist:
822             data = os.read(proc.stderr.fileno(), 1024)
823             if data == "":
824                 proc.stderr.close()
825                 read_set.remove(proc.stderr)
826             stderr.append(data)
827     
828     # All data exchanged.  Translate lists into strings.
829     if stdout is not None:
830         stdout = ''.join(stdout)
831     if stderr is not None:
832         stderr = ''.join(stderr)
833
834     # Translate newlines, if requested.  We cannot let the file
835     # object do the translation: It is based on stdio, which is
836     # impossible to combine with select (unless forcing no
837     # buffering).
838     if proc.universal_newlines and hasattr(file, 'newlines'):
839         if stdout:
840             stdout = proc._translate_newlines(stdout)
841         if stderr:
842             stderr = proc._translate_newlines(stderr)
843
844     if killed and err_on_timeout:
845         errcode = proc.poll()
846         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
847     else:
848         if killed:
849             proc.poll()
850         else:
851             proc.wait()
852         return (stdout, stderr)
853