Fix #29 LinuxApplication passing a list of files as 'sources' not working
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 import base64
22 import errno
23 import hashlib
24 import logging
25 import os
26 import os.path
27 import re
28 import select
29 import signal
30 import socket
31 import subprocess
32 import threading
33 import time
34 import tempfile
35
36 logger = logging.getLogger("sshfuncs")
37
38 def log(msg, level, out = None, err = None):
39     if out:
40         msg += " - OUT: %s " % out
41
42     if err:
43         msg += " - ERROR: %s " % err
44
45     logger.log(level, msg)
46
47
48 if hasattr(os, "devnull"):
49     DEV_NULL = os.devnull
50 else:
51     DEV_NULL = "/dev/null"
52
53 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
54
55 class STDOUT: 
56     """
57     Special value that when given to rspawn in stderr causes stderr to 
58     redirect to whatever stdout was redirected to.
59     """
60
61 class ProcStatus:
62     """
63     Codes for status of remote spawned process
64     """
65     # Process is still running
66     RUNNING = 1
67
68     # Process is finished
69     FINISHED = 2
70     
71     # Process hasn't started running yet (this should be very rare)
72     NOT_STARTED = 3
73
74 hostbyname_cache = dict()
75 hostbyname_cache_lock = threading.Lock()
76
77 def gethostbyname(host):
78     global hostbyname_cache
79     global hostbyname_cache_lock
80     
81     hostbyname = hostbyname_cache.get(host)
82     if not hostbyname:
83         with hostbyname_cache_lock:
84             hostbyname = socket.gethostbyname(host)
85             hostbyname_cache[host] = hostbyname
86
87             msg = " Added hostbyname %s - %s " % (host, hostbyname)
88             log(msg, logging.DEBUG)
89
90     return hostbyname
91
92 OPENSSH_HAS_PERSIST = None
93
94 def openssh_has_persist():
95     """ The ssh_config options ControlMaster and ControlPersist allow to
96     reuse a same network connection for multiple ssh sessions. In this 
97     way limitations on number of open ssh connections can be bypassed.
98     However, older versions of openSSH do not support this feature.
99     This function is used to determine if ssh connection persist features
100     can be used.
101     """
102     global OPENSSH_HAS_PERSIST
103     if OPENSSH_HAS_PERSIST is None:
104         proc = subprocess.Popen(["ssh","-v"],
105             stdout = subprocess.PIPE,
106             stderr = subprocess.STDOUT,
107             stdin = open("/dev/null","r") )
108         out,err = proc.communicate()
109         proc.wait()
110         
111         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
112         OPENSSH_HAS_PERSIST = bool(vre.match(out))
113     return OPENSSH_HAS_PERSIST
114
115 def make_server_key_args(server_key, host, port):
116     """ Returns a reference to a temporary known_hosts file, to which 
117     the server key has been added. 
118     
119     Make sure to hold onto the temp file reference until the process is 
120     done with it
121
122     :param server_key: the server public key
123     :type server_key: str
124
125     :param host: the hostname
126     :type host: str
127
128     :param port: the ssh port
129     :type port: str
130
131     """
132     if port is not None:
133         host = '%s:%s' % (host, str(port))
134
135     # Create a temporary server key file
136     tmp_known_hosts = tempfile.NamedTemporaryFile()
137    
138     hostbyname = gethostbyname(host) 
139
140     # Add the intended host key
141     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
142     
143     # If we're not in strict mode, add user-configured keys
144     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
145         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
146         if os.access(user_hosts_path, os.R_OK):
147             f = open(user_hosts_path, "r")
148             tmp_known_hosts.write(f.read())
149             f.close()
150         
151     tmp_known_hosts.flush()
152     
153     return tmp_known_hosts
154
155 def make_control_path(agent, forward_x11):
156     ctrl_path = "/tmp/nepi_ssh"
157
158     if agent:
159         ctrl_path +="_a"
160
161     if forward_x11:
162         ctrl_path +="_x"
163
164     ctrl_path += "-%r@%h:%p"
165
166     return ctrl_path
167
168 def shell_escape(s):
169     """ Escapes strings so that they are safe to use as command-line 
170     arguments """
171     if SHELL_SAFE.match(s):
172         # safe string - no escaping needed
173         return s
174     else:
175         # unsafe string - escape
176         def escp(c):
177             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
178                 return c
179             else:
180                 return "'$'\\x%02x''" % (ord(c),)
181         s = ''.join(map(escp,s))
182         return "'%s'" % (s,)
183
184 def eintr_retry(func):
185     """Retries a function invocation when a EINTR occurs"""
186     import functools
187     @functools.wraps(func)
188     def rv(*p, **kw):
189         retry = kw.pop("_retry", False)
190         for i in xrange(0 if retry else 4):
191             try:
192                 return func(*p, **kw)
193             except (select.error, socket.error), args:
194                 if args[0] == errno.EINTR:
195                     continue
196                 else:
197                     raise 
198             except OSError, e:
199                 if e.errno == errno.EINTR:
200                     continue
201                 else:
202                     raise
203         else:
204             return func(*p, **kw)
205     return rv
206
207 def rexec(command, host, user, 
208         port = None,
209         gwuser = None,
210         gw = None, 
211         agent = True,
212         sudo = False,
213         stdin = None,
214         identity = None,
215         server_key = None,
216         env = None,
217         tty = False,
218         timeout = None,
219         retry = 3,
220         err_on_timeout = True,
221         connect_timeout = 30,
222         persistent = True,
223         forward_x11 = False,
224         blocking = True,
225         strict_host_checking = True):
226     """
227     Executes a remote command, returns ((stdout,stderr),process)
228     """
229     
230     tmp_known_hosts = None
231     if not gw:
232         hostip = gethostbyname(host)
233     else: hostip = None
234
235     args = ['ssh', '-C',
236             # Don't bother with localhost. Makes test easier
237             '-o', 'NoHostAuthenticationForLocalhost=yes',
238             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
239             '-o', 'ConnectionAttempts=3',
240             '-o', 'ServerAliveInterval=30',
241             '-o', 'TCPKeepAlive=yes',
242             '-o', 'Batchmode=yes',
243             '-l', user, hostip or host]
244
245     if persistent and openssh_has_persist():
246         args.extend([
247             '-o', 'ControlMaster=auto',
248             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
249             '-o', 'ControlPersist=60' ])
250
251     if not strict_host_checking:
252         # Do not check for Host key. Unsafe.
253         args.extend(['-o', 'StrictHostKeyChecking=no'])
254
255     if gw:
256         if gwuser:
257             proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
258         else:
259             proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
260         args.extend(['-o', proxycommand])
261
262     if agent:
263         args.append('-A')
264
265     if port:
266         args.append('-p%d' % port)
267
268     if identity:
269         args.extend(('-i', identity))
270
271     if tty:
272         args.append('-t')
273         args.append('-t')
274
275     if forward_x11:
276         args.append('-X')
277
278     if server_key:
279         # Create a temporary server key file
280         tmp_known_hosts = make_server_key_args(server_key, host, port)
281         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
282
283     if sudo:
284         command = "sudo " + command
285
286     args.append(command)
287
288     for x in xrange(retry):
289         # connects to the remote host and starts a remote connection
290         proc = subprocess.Popen(args,
291                 env = env,
292                 stdout = subprocess.PIPE,
293                 stdin = subprocess.PIPE, 
294                 stderr = subprocess.PIPE)
295        
296         # attach tempfile object to the process, to make sure the file stays
297         # alive until the process is finished with it
298         proc._known_hosts = tmp_known_hosts
299     
300         # by default, rexec calls _communicate which will block 
301         # until the process has exit. The argument block == False 
302         # forces to rexec to return immediately, without blocking 
303         try:
304             if blocking:
305                 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
306             else:
307                 err = proc.stderr.read()
308                 out = proc.stdout.read()
309
310             msg = " rexec - host %s - command %s " % (host, " ".join(args))
311             log(msg, logging.DEBUG, out, err)
312
313             if proc.poll():
314                 skip = False
315
316                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
317                     # SSH error, can safely retry
318                     skip = True 
319                 elif retry:
320                     # Probably timed out or plain failed but can retry
321                     skip = True 
322                 
323                 if skip:
324                     t = x*2
325                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
326                             t, x, host, " ".join(args))
327                     log(msg, logging.DEBUG)
328
329                     time.sleep(t)
330                     continue
331             break
332         except RuntimeError, e:
333             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
334             log(msg, logging.DEBUG, out, err)
335
336             if retry <= 0:
337                 raise
338             retry -= 1
339         
340     return ((out, err), proc)
341
342 def rcopy(source, dest,
343         port = None,
344         gwuser = None,
345         gw = None,
346         agent = True, 
347         recursive = False,
348         identity = None,
349         server_key = None,
350         retry = 3,
351         strict_host_checking = True):
352     """
353     Copies from/to remote sites.
354     
355     Source and destination should have the user and host encoded
356     as per scp specs.
357     
358     Source can be a list of files to copy to a single destination, 
359     (in which case it is advised that the destination be a folder),
360     a single string or a string list of colon-separeted files.
361     """
362
363     # Parse destination as <user>@<server>:<path>
364     if isinstance(dest, str) and ':' in dest:
365         remspec, path = dest.split(':',1)
366     elif isinstance(source, str) and ':' in source:
367         remspec, path = source.split(':',1)
368     else:
369         raise ValueError, "Both endpoints cannot be local"
370     user,host = remspec.rsplit('@',1)
371     
372     # plain scp
373     tmp_known_hosts = None
374
375     args = ['scp', '-q', '-p', '-C',
376             # Speed up transfer using blowfish cypher specification which is 
377             # faster than the default one (3des)
378             '-c', 'blowfish',
379             # Don't bother with localhost. Makes test easier
380             '-o', 'NoHostAuthenticationForLocalhost=yes',
381             '-o', 'ConnectTimeout=60',
382             '-o', 'ConnectionAttempts=3',
383             '-o', 'ServerAliveInterval=30',
384             '-o', 'TCPKeepAlive=yes' ]
385             
386     if port:
387         args.append('-P%d' % port)
388
389     if gw:
390         if gwuser:
391             proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
392         else:
393             proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
394         args.extend(['-o', proxycommand])
395
396     if recursive:
397         args.append('-r')
398
399     if identity:
400         args.extend(('-i', identity))
401
402     if server_key:
403         # Create a temporary server key file
404         tmp_known_hosts = make_server_key_args(server_key, host, port)
405         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
406
407     if not strict_host_checking:
408         # Do not check for Host key. Unsafe.
409         args.extend(['-o', 'StrictHostKeyChecking=no'])
410
411     if openssh_has_persist():
412         args.extend([
413             '-o', 'ControlMaster=auto',
414             '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
415             ])
416
417     if isinstance(dest, str):
418         dest = map(str.strip, dest.split(";"))
419
420     if isinstance(source, str):
421         source = map(str.strip, source.split(";"))
422
423     args.extend(source)
424
425     args.extend(dest)
426
427     for x in xrange(retry):
428         # connects to the remote host and starts a remote connection
429         proc = subprocess.Popen(args,
430                 stdout = subprocess.PIPE,
431                 stdin = subprocess.PIPE, 
432                 stderr = subprocess.PIPE)
433         
434         # attach tempfile object to the process, to make sure the file stays
435         # alive until the process is finished with it
436         proc._known_hosts = tmp_known_hosts
437     
438         try:
439             (out, err) = proc.communicate()
440             eintr_retry(proc.wait)()
441             msg = " rcopy - host %s - command %s " % (host, " ".join(args))
442             log(msg, logging.DEBUG, out, err)
443
444             if proc.poll():
445                 t = x*2
446                 msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
447                         t, x, host, " ".join(args))
448                 log(msg, logging.DEBUG)
449
450                 time.sleep(t)
451                 continue
452
453             break
454         except RuntimeError, e:
455             msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
456             log(msg, logging.DEBUG, out, err)
457
458             if retry <= 0:
459                 raise
460             retry -= 1
461         
462     return ((out, err), proc)
463
464 def rspawn(command, pidfile, 
465         stdout = '/dev/null', 
466         stderr = STDOUT, 
467         stdin = '/dev/null',
468         home = None, 
469         create_home = False, 
470         sudo = False,
471         host = None, 
472         port = None, 
473         user = None, 
474         gwuser = None,
475         gw = None,
476         agent = None, 
477         identity = None, 
478         server_key = None,
479         tty = False):
480     """
481     Spawn a remote command such that it will continue working asynchronously in 
482     background. 
483
484         :param command: The command to run, it should be a single line.
485         :type command: str
486
487         :param pidfile: Path to a file where to store the pid and ppid of the 
488                         spawned process
489         :type pidfile: str
490
491         :param stdout: Path to file to redirect standard output. 
492                        The default value is /dev/null
493         :type stdout: str
494
495         :param stderr: Path to file to redirect standard error.
496                        If the special STDOUT value is used, stderr will 
497                        be redirected to the same file as stdout
498         :type stderr: str
499
500         :param stdin: Path to a file with input to be piped into the command's standard input
501         :type stdin: str
502
503         :param home: Path to working directory folder. 
504                     It is assumed to exist unless the create_home flag is set.
505         :type home: str
506
507         :param create_home: Flag to force creation of the home folder before 
508                             running the command
509         :type create_home: bool
510  
511         :param sudo: Flag forcing execution with sudo user
512         :type sudo: bool
513         
514         :rtype: touple
515
516         (stdout, stderr), process
517         
518         Of the spawning process, which only captures errors at spawning time.
519         Usually only useful for diagnostics.
520     """
521     # Start process in a "daemonized" way, using nohup and heavy
522     # stdin/out redirection to avoid connection issues
523     if stderr is STDOUT:
524         stderr = '&1'
525     else:
526         stderr = ' ' + stderr
527     
528     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
529         'command' : command,
530         'pidfile' : shell_escape(pidfile),
531         'stdout' : stdout,
532         'stderr' : stderr,
533         'stdin' : stdin,
534     }
535     
536     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
537             'command' : shell_escape(daemon_command),
538             'sudo' : 'sudo -S' if sudo else '',
539             'pidfile' : shell_escape(pidfile),
540             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
541             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
542         }
543
544     (out,err),proc = rexec(
545         cmd,
546         host = host,
547         port = port,
548         user = user,
549         gwuser = gwuser,
550         gw = gw,
551         agent = agent,
552         identity = identity,
553         server_key = server_key,
554         tty = tty ,
555         )
556     
557     if proc.wait():
558         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
559
560     return ((out, err), proc)
561
562 @eintr_retry
563 def rgetpid(pidfile,
564         host = None, 
565         port = None, 
566         user = None, 
567         gwuser = None,
568         gw = None,
569         agent = None, 
570         identity = None,
571         server_key = None):
572     """
573     Returns the pid and ppid of a process from a remote file where the 
574     information was stored.
575
576         :param home: Path to directory where the pidfile is located
577         :type home: str
578
579         :param pidfile: Name of file containing the pid information
580         :type pidfile: str
581         
582         :rtype: int
583         
584         A (pid, ppid) tuple useful for calling rstatus and rkill,
585         or None if the pidfile isn't valid yet (can happen when process is staring up)
586
587     """
588     (out,err),proc = rexec(
589         "cat %(pidfile)s" % {
590             'pidfile' : pidfile,
591         },
592         host = host,
593         port = port,
594         user = user,
595         gwuser = gwuser,
596         gw = gw,
597         agent = agent,
598         identity = identity,
599         server_key = server_key
600         )
601         
602     if proc.wait():
603         return None
604     
605     if out:
606         try:
607             return map(int,out.strip().split(' ',1))
608         except:
609             # Ignore, many ways to fail that don't matter that much
610             return None
611
612 @eintr_retry
613 def rstatus(pid, ppid, 
614         host = None, 
615         port = None, 
616         user = None, 
617         gwuser = None,
618         gw = None,
619         agent = None, 
620         identity = None,
621         server_key = None):
622     """
623     Returns a code representing the the status of a remote process
624
625         :param pid: Process id of the process
626         :type pid: int
627
628         :param ppid: Parent process id of process
629         :type ppid: int
630     
631         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
632     
633     """
634     (out,err),proc = rexec(
635         # Check only by pid. pid+ppid does not always work (especially with sudo) 
636         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
637             'ppid' : ppid,
638             'pid' : pid,
639         },
640         host = host,
641         port = port,
642         user = user,
643         gwuser = gwuser,
644         gw = gw,
645         agent = agent,
646         identity = identity,
647         server_key = server_key
648         )
649     
650     if proc.wait():
651         return ProcStatus.NOT_STARTED
652     
653     status = False
654     if err:
655         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
656             status = True
657     elif out:
658         status = (out.strip() == 'wait')
659     else:
660         return ProcStatus.NOT_STARTED
661     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
662
663 @eintr_retry
664 def rkill(pid, ppid,
665         host = None, 
666         port = None, 
667         user = None, 
668         gwuser = None,
669         gw = None,
670         agent = None, 
671         sudo = False,
672         identity = None, 
673         server_key = None, 
674         nowait = False):
675     """
676     Sends a kill signal to a remote process.
677
678     First tries a SIGTERM, and if the process does not end in 10 seconds,
679     it sends a SIGKILL.
680  
681         :param pid: Process id of process to be killed
682         :type pid: int
683
684         :param ppid: Parent process id of process to be killed
685         :type ppid: int
686
687         :param sudo: Flag indicating if sudo should be used to kill the process
688         :type sudo: bool
689         
690     """
691     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
692     cmd = """
693 SUBKILL="%(subkill)s" ;
694 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
695 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
696 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
697     sleep 0.2 
698     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
699         break
700     else
701         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
702         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
703     fi
704     sleep 1.8
705 done
706 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
707     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
708     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
709 fi
710 """
711     if nowait:
712         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
713
714     (out,err),proc = rexec(
715         cmd % {
716             'ppid' : ppid,
717             'pid' : pid,
718             'sudo' : 'sudo -S' if sudo else '',
719             'subkill' : subkill,
720         },
721         host = host,
722         port = port,
723         user = user,
724         gwuser = gwuser,
725         gw = gw,
726         agent = agent,
727         identity = identity,
728         server_key = server_key
729         )
730     
731     # wait, don't leave zombies around
732     proc.wait()
733
734     return (out, err), proc
735
736 # POSIX
737 def _communicate(proc, input, timeout=None, err_on_timeout=True):
738     read_set = []
739     write_set = []
740     stdout = None # Return
741     stderr = None # Return
742     
743     killed = False
744     
745     if timeout is not None:
746         timelimit = time.time() + timeout
747         killtime = timelimit + 4
748         bailtime = timelimit + 4
749
750     if proc.stdin:
751         # Flush stdio buffer.  This might block, if the user has
752         # been writing to .stdin in an uncontrolled fashion.
753         proc.stdin.flush()
754         if input:
755             write_set.append(proc.stdin)
756         else:
757             proc.stdin.close()
758
759     if proc.stdout:
760         read_set.append(proc.stdout)
761         stdout = []
762
763     if proc.stderr:
764         read_set.append(proc.stderr)
765         stderr = []
766
767     input_offset = 0
768     while read_set or write_set:
769         if timeout is not None:
770             curtime = time.time()
771             if timeout is None or curtime > timelimit:
772                 if curtime > bailtime:
773                     break
774                 elif curtime > killtime:
775                     signum = signal.SIGKILL
776                 else:
777                     signum = signal.SIGTERM
778                 # Lets kill it
779                 os.kill(proc.pid, signum)
780                 select_timeout = 0.5
781             else:
782                 select_timeout = timelimit - curtime + 0.1
783         else:
784             select_timeout = 1.0
785         
786         if select_timeout > 1.0:
787             select_timeout = 1.0
788             
789         try:
790             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
791         except select.error,e:
792             if e[0] != 4:
793                 raise
794             else:
795                 continue
796         
797         if not rlist and not wlist and not xlist and proc.poll() is not None:
798             # timeout and process exited, say bye
799             break
800
801         if proc.stdin in wlist:
802             # When select has indicated that the file is writable,
803             # we can write up to PIPE_BUF bytes without risk
804             # blocking.  POSIX defines PIPE_BUF >= 512
805             bytes_written = os.write(proc.stdin.fileno(),
806                     buffer(input, input_offset, 512))
807             input_offset += bytes_written
808
809             if input_offset >= len(input):
810                 proc.stdin.close()
811                 write_set.remove(proc.stdin)
812
813         if proc.stdout in rlist:
814             data = os.read(proc.stdout.fileno(), 1024)
815             if data == "":
816                 proc.stdout.close()
817                 read_set.remove(proc.stdout)
818             stdout.append(data)
819
820         if proc.stderr in rlist:
821             data = os.read(proc.stderr.fileno(), 1024)
822             if data == "":
823                 proc.stderr.close()
824                 read_set.remove(proc.stderr)
825             stderr.append(data)
826     
827     # All data exchanged.  Translate lists into strings.
828     if stdout is not None:
829         stdout = ''.join(stdout)
830     if stderr is not None:
831         stderr = ''.join(stderr)
832
833     # Translate newlines, if requested.  We cannot let the file
834     # object do the translation: It is based on stdio, which is
835     # impossible to combine with select (unless forcing no
836     # buffering).
837     if proc.universal_newlines and hasattr(file, 'newlines'):
838         if stdout:
839             stdout = proc._translate_newlines(stdout)
840         if stderr:
841             stderr = proc._translate_newlines(stderr)
842
843     if killed and err_on_timeout:
844         errcode = proc.poll()
845         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
846     else:
847         if killed:
848             proc.poll()
849         else:
850             proc.wait()
851         return (stdout, stderr)
852