b06abdbd998ca73890ba5504dac758cd7e68e063
[nepi.git] / src / nepi / util / sshfuncs.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Claudio Freire <claudio-daniel.freire@inria.fr>
20
21 ## TODO: This code needs reviewing !!!
22
23 import base64
24 import errno
25 import hashlib
26 import logging
27 import os
28 import os.path
29 import re
30 import select
31 import signal
32 import socket
33 import subprocess
34 import threading
35 import time
36 import tempfile
37
38 logger = logging.getLogger("sshfuncs")
39
40 def log(msg, level, out = None, err = None):
41     if out:
42         msg += " - OUT: %s " % out
43
44     if err:
45         msg += " - ERROR: %s " % err
46
47     logger.log(level, msg)
48
49 if hasattr(os, "devnull"):
50     DEV_NULL = os.devnull
51 else:
52     DEV_NULL = "/dev/null"
53
54 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
55
56 class STDOUT: 
57     """
58     Special value that when given to rspawn in stderr causes stderr to 
59     redirect to whatever stdout was redirected to.
60     """
61
62 class ProcStatus:
63     """
64     Codes for status of remote spawned process
65     """
66     # Process is still running
67     RUNNING = 1
68
69     # Process is finished
70     FINISHED = 2
71     
72     # Process hasn't started running yet (this should be very rare)
73     NOT_STARTED = 3
74
75 hostbyname_cache = dict()
76 hostbyname_cache_lock = threading.Lock()
77
78 def gethostbyname(host):
79     global hostbyname_cache
80     global hostbyname_cache_lock
81     
82     hostbyname = hostbyname_cache.get(host)
83     if not hostbyname:
84         with hostbyname_cache_lock:
85             hostbyname = socket.gethostbyname(host)
86             hostbyname_cache[host] = hostbyname
87
88             msg = " Added hostbyname %s - %s " % (host, hostbyname)
89             log(msg, logging.DEBUG)
90
91     return hostbyname
92
93 OPENSSH_HAS_PERSIST = None
94
95 def openssh_has_persist():
96     """ The ssh_config options ControlMaster and ControlPersist allow to
97     reuse a same network connection for multiple ssh sessions. In this 
98     way limitations on number of open ssh connections can be bypassed.
99     However, older versions of openSSH do not support this feature.
100     This function is used to determine if ssh connection persist features
101     can be used.
102     """
103     global OPENSSH_HAS_PERSIST
104     if OPENSSH_HAS_PERSIST is None:
105         proc = subprocess.Popen(["ssh","-v"],
106             stdout = subprocess.PIPE,
107             stderr = subprocess.STDOUT,
108             stdin = open("/dev/null","r") )
109         out,err = proc.communicate()
110         proc.wait()
111         
112         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
113         OPENSSH_HAS_PERSIST = bool(vre.match(out))
114     return OPENSSH_HAS_PERSIST
115
116 def make_server_key_args(server_key, host, port):
117     """ Returns a reference to a temporary known_hosts file, to which 
118     the server key has been added. 
119     
120     Make sure to hold onto the temp file reference until the process is 
121     done with it
122
123     :param server_key: the server public key
124     :type server_key: str
125
126     :param host: the hostname
127     :type host: str
128
129     :param port: the ssh port
130     :type port: str
131
132     """
133     if port is not None:
134         host = '%s:%s' % (host, str(port))
135
136     # Create a temporary server key file
137     tmp_known_hosts = tempfile.NamedTemporaryFile()
138    
139     hostbyname = gethostbyname(host) 
140
141     # Add the intended host key
142     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
143     
144     # If we're not in strict mode, add user-configured keys
145     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
146         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
147         if os.access(user_hosts_path, os.R_OK):
148             f = open(user_hosts_path, "r")
149             tmp_known_hosts.write(f.read())
150             f.close()
151         
152     tmp_known_hosts.flush()
153     
154     return tmp_known_hosts
155
156 def make_control_path(agent, forward_x11):
157     ctrl_path = "/tmp/nepi_ssh"
158
159     if agent:
160         ctrl_path +="_a"
161
162     if forward_x11:
163         ctrl_path +="_x"
164
165     ctrl_path += "-%r@%h:%p"
166
167     return ctrl_path
168
169 def shell_escape(s):
170     """ Escapes strings so that they are safe to use as command-line 
171     arguments """
172     if SHELL_SAFE.match(s):
173         # safe string - no escaping needed
174         return s
175     else:
176         # unsafe string - escape
177         def escp(c):
178             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
179                 return c
180             else:
181                 return "'$'\\x%02x''" % (ord(c),)
182         s = ''.join(map(escp,s))
183         return "'%s'" % (s,)
184
185 def eintr_retry(func):
186     """Retries a function invocation when a EINTR occurs"""
187     import functools
188     @functools.wraps(func)
189     def rv(*p, **kw):
190         retry = kw.pop("_retry", False)
191         for i in xrange(0 if retry else 4):
192             try:
193                 return func(*p, **kw)
194             except (select.error, socket.error), args:
195                 if args[0] == errno.EINTR:
196                     continue
197                 else:
198                     raise 
199             except OSError, e:
200                 if e.errno == errno.EINTR:
201                     continue
202                 else:
203                     raise
204         else:
205             return func(*p, **kw)
206     return rv
207
208 def rexec(command, host, user, 
209         port = None,
210         gwuser = None,
211         gw = None, 
212         agent = True,
213         sudo = False,
214         identity = None,
215         server_key = None,
216         env = None,
217         tty = False,
218         connect_timeout = 30,
219         retry = 3,
220         persistent = True,
221         forward_x11 = False,
222         blocking = True,
223         strict_host_checking = True):
224     """
225     Executes a remote command, returns ((stdout,stderr),process)
226     """
227
228     tmp_known_hosts = None
229     if not gw:
230         hostip = gethostbyname(host)
231     else: hostip = None
232
233     args = ['ssh', '-C',
234             # Don't bother with localhost. Makes test easier
235             '-o', 'NoHostAuthenticationForLocalhost=yes',
236             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
237             '-o', 'ConnectionAttempts=3',
238             '-o', 'ServerAliveInterval=30',
239             '-o', 'TCPKeepAlive=yes',
240             '-o', 'Batchmode=yes',
241             '-l', user, hostip or host]
242
243     if persistent and openssh_has_persist():
244         args.extend([
245             '-o', 'ControlMaster=auto',
246             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
247             '-o', 'ControlPersist=60' ])
248
249     if not strict_host_checking:
250         # Do not check for Host key. Unsafe.
251         args.extend(['-o', 'StrictHostKeyChecking=no'])
252
253     if gw:
254         if gwuser:
255             proxycommand = 'ProxyCommand=ssh -q %s@%s -W %%h:%%p' % (gwuser, gw)
256         else:
257             proxycommand = 'ProxyCommand=ssh -q %%r@%s -W %%h:%%p' % gw
258         args.extend(['-o', proxycommand])
259
260     if agent:
261         args.append('-A')
262
263     if port:
264         args.append('-p%d' % port)
265
266     if identity:
267         identity = os.path.expanduser(identity)
268         args.extend(('-i', identity))
269
270     if tty:
271         args.append('-t')
272         args.append('-t')
273
274     if forward_x11:
275         args.append('-X')
276
277     if server_key:
278         # Create a temporary server key file
279         tmp_known_hosts = make_server_key_args(server_key, host, port)
280         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
281
282     if sudo:
283         command = "sudo " + command
284
285     args.append(command)
286
287     log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) 
288
289     stdout = stderr = stdin = subprocess.PIPE
290     if forward_x11:
291         stdout = stderr = stdin = None
292
293     return _retry_rexec(args, log_msg, 
294             stderr = stderr,
295             stdin = stdin,
296             stdout = stdout,
297             env = env, 
298             retry = retry, 
299             tmp_known_hosts = tmp_known_hosts,
300             blocking = blocking)
301
302 def rcopy(source, dest,
303         port = None,
304         gwuser = None,
305         gw = None,
306         recursive = False,
307         identity = None,
308         server_key = None,
309         retry = 3,
310         strict_host_checking = True):
311     """
312     Copies from/to remote sites.
313     
314     Source and destination should have the user and host encoded
315     as per scp specs.
316     
317     Source can be a list of files to copy to a single destination, 
318     (in which case it is advised that the destination be a folder),
319     or a single file in a string.
320     """
321
322     # Parse destination as <user>@<server>:<path>
323     if isinstance(dest, str) and ':' in dest:
324         remspec, path = dest.split(':',1)
325     elif isinstance(source, str) and ':' in source:
326         remspec, path = source.split(':',1)
327     else:
328         raise ValueError, "Both endpoints cannot be local"
329     user,host = remspec.rsplit('@',1)
330     
331     # plain scp
332     tmp_known_hosts = None
333
334     args = ['scp', '-q', '-p', '-C',
335             # Speed up transfer using blowfish cypher specification which is 
336             # faster than the default one (3des)
337             '-c', 'blowfish',
338             # Don't bother with localhost. Makes test easier
339             '-o', 'NoHostAuthenticationForLocalhost=yes',
340             '-o', 'ConnectTimeout=60',
341             '-o', 'ConnectionAttempts=3',
342             '-o', 'ServerAliveInterval=30',
343             '-o', 'TCPKeepAlive=yes' ]
344             
345     if port:
346         args.append('-P%d' % port)
347
348     if gw:
349         if gwuser:
350             proxycommand = 'ProxyCommand=ssh -q %s@%s -W %%h:%%p' % (gwuser, gw)
351         else:
352             proxycommand = 'ProxyCommand=ssh -q %%r@%s -W %%h:%%p' % gw
353         args.extend(['-o', proxycommand])
354
355     if recursive:
356         args.append('-r')
357
358     if identity:
359         identity = os.path.expanduser(identity)
360         args.extend(('-i', identity))
361
362     if server_key:
363         # Create a temporary server key file
364         tmp_known_hosts = make_server_key_args(server_key, host, port)
365         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
366
367     if not strict_host_checking:
368         # Do not check for Host key. Unsafe.
369         args.extend(['-o', 'StrictHostKeyChecking=no'])
370     
371     if isinstance(source, list):
372         args.extend(source)
373     else:
374         if openssh_has_persist():
375             args.extend([
376                 '-o', 'ControlMaster=auto',
377                 '-o', 'ControlPath=%s' % (make_control_path(False, False),)
378                 ])
379         args.append(source)
380
381     if isinstance(dest, list):
382         args.extend(dest)
383     else:
384         args.append(dest)
385
386     log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
387     
388     return _retry_rexec(args, log_msg, env = None, retry = retry, 
389             tmp_known_hosts = tmp_known_hosts,
390             blocking = True)
391
392 def rspawn(command, pidfile, 
393         stdout = '/dev/null', 
394         stderr = STDOUT, 
395         stdin = '/dev/null',
396         home = None, 
397         create_home = False, 
398         sudo = False,
399         host = None, 
400         port = None, 
401         user = None, 
402         gwuser = None,
403         gw = None,
404         agent = None, 
405         identity = None, 
406         server_key = None,
407         tty = False):
408     """
409     Spawn a remote command such that it will continue working asynchronously in 
410     background. 
411
412         :param command: The command to run, it should be a single line.
413         :type command: str
414
415         :param pidfile: Path to a file where to store the pid and ppid of the 
416                         spawned process
417         :type pidfile: str
418
419         :param stdout: Path to file to redirect standard output. 
420                        The default value is /dev/null
421         :type stdout: str
422
423         :param stderr: Path to file to redirect standard error.
424                        If the special STDOUT value is used, stderr will 
425                        be redirected to the same file as stdout
426         :type stderr: str
427
428         :param stdin: Path to a file with input to be piped into the command's standard input
429         :type stdin: str
430
431         :param home: Path to working directory folder. 
432                     It is assumed to exist unless the create_home flag is set.
433         :type home: str
434
435         :param create_home: Flag to force creation of the home folder before 
436                             running the command
437         :type create_home: bool
438  
439         :param sudo: Flag forcing execution with sudo user
440         :type sudo: bool
441         
442         :rtype: touple
443
444         (stdout, stderr), process
445         
446         Of the spawning process, which only captures errors at spawning time.
447         Usually only useful for diagnostics.
448     """
449     # Start process in a "daemonized" way, using nohup and heavy
450     # stdin/out redirection to avoid connection issues
451     if stderr is STDOUT:
452         stderr = '&1'
453     else:
454         stderr = ' ' + stderr
455     
456     daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
457         'command' : command,
458         'pidfile' : shell_escape(pidfile),
459         'stdout' : stdout,
460         'stderr' : stderr,
461         'stdin' : stdin,
462     }
463     
464     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
465             'command' : shell_escape(daemon_command),
466             'sudo' : 'sudo -S' if sudo else '',
467             'pidfile' : shell_escape(pidfile),
468             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
469             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
470         }
471
472     (out,err),proc = rexec(
473         cmd,
474         host = host,
475         port = port,
476         user = user,
477         gwuser = gwuser,
478         gw = gw,
479         agent = agent,
480         identity = identity,
481         server_key = server_key,
482         tty = tty ,
483         )
484     
485     if proc.wait():
486         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
487
488     return ((out, err), proc)
489
490 @eintr_retry
491 def rgetpid(pidfile,
492         host = None, 
493         port = None, 
494         user = None, 
495         gwuser = None,
496         gw = None,
497         agent = None, 
498         identity = None,
499         server_key = None):
500     """
501     Returns the pid and ppid of a process from a remote file where the 
502     information was stored.
503
504         :param home: Path to directory where the pidfile is located
505         :type home: str
506
507         :param pidfile: Name of file containing the pid information
508         :type pidfile: str
509         
510         :rtype: int
511         
512         A (pid, ppid) tuple useful for calling rstatus and rkill,
513         or None if the pidfile isn't valid yet (can happen when process is staring up)
514
515     """
516     (out,err),proc = rexec(
517         "cat %(pidfile)s" % {
518             'pidfile' : pidfile,
519         },
520         host = host,
521         port = port,
522         user = user,
523         gwuser = gwuser,
524         gw = gw,
525         agent = agent,
526         identity = identity,
527         server_key = server_key
528         )
529         
530     if proc.wait():
531         return None
532     
533     if out:
534         try:
535             return map(int,out.strip().split(' ',1))
536         except:
537             # Ignore, many ways to fail that don't matter that much
538             return None
539
540 @eintr_retry
541 def rstatus(pid, ppid, 
542         host = None, 
543         port = None, 
544         user = None, 
545         gwuser = None,
546         gw = None,
547         agent = None, 
548         identity = None,
549         server_key = None):
550     """
551     Returns a code representing the the status of a remote process
552
553         :param pid: Process id of the process
554         :type pid: int
555
556         :param ppid: Parent process id of process
557         :type ppid: int
558     
559         :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
560     
561     """
562     (out,err),proc = rexec(
563         # Check only by pid. pid+ppid does not always work (especially with sudo) 
564         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
565             'ppid' : ppid,
566             'pid' : pid,
567         },
568         host = host,
569         port = port,
570         user = user,
571         gwuser = gwuser,
572         gw = gw,
573         agent = agent,
574         identity = identity,
575         server_key = server_key
576         )
577     
578     if proc.wait():
579         return ProcStatus.NOT_STARTED
580     
581     status = False
582     if err:
583         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
584             status = True
585     elif out:
586         status = (out.strip() == 'wait')
587     else:
588         return ProcStatus.NOT_STARTED
589     return ProcStatus.RUNNING if status else ProcStatus.FINISHED
590
591 @eintr_retry
592 def rkill(pid, ppid,
593         host = None, 
594         port = None, 
595         user = None, 
596         gwuser = None,
597         gw = None,
598         agent = None, 
599         sudo = False,
600         identity = None, 
601         server_key = None, 
602         nowait = False):
603     """
604     Sends a kill signal to a remote process.
605
606     First tries a SIGTERM, and if the process does not end in 10 seconds,
607     it sends a SIGKILL.
608  
609         :param pid: Process id of process to be killed
610         :type pid: int
611
612         :param ppid: Parent process id of process to be killed
613         :type ppid: int
614
615         :param sudo: Flag indicating if sudo should be used to kill the process
616         :type sudo: bool
617         
618     """
619     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
620     cmd = """
621 SUBKILL="%(subkill)s" ;
622 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
623 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
624 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
625     sleep 0.2 
626     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
627         break
628     else
629         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
630         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
631     fi
632     sleep 1.8
633 done
634 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
635     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
636     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
637 fi
638 """
639     if nowait:
640         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
641
642     (out,err),proc = rexec(
643         cmd % {
644             'ppid' : ppid,
645             'pid' : pid,
646             'sudo' : 'sudo -S' if sudo else '',
647             'subkill' : subkill,
648         },
649         host = host,
650         port = port,
651         user = user,
652         gwuser = gwuser,
653         gw = gw,
654         agent = agent,
655         identity = identity,
656         server_key = server_key
657         )
658     
659     # wait, don't leave zombies around
660     proc.wait()
661
662     return (out, err), proc
663
664 def _retry_rexec(args,
665         log_msg,
666         stdout = subprocess.PIPE,
667         stdin = subprocess.PIPE, 
668         stderr = subprocess.PIPE,
669         env = None,
670         retry = 3,
671         tmp_known_hosts = None,
672         blocking = True):
673
674     for x in xrange(retry):
675         # connects to the remote host and starts a remote connection
676         proc = subprocess.Popen(args,
677                 env = env,
678                 stdout = stdout,
679                 stdin = stdin, 
680                 stderr = stderr)
681         
682         # attach tempfile object to the process, to make sure the file stays
683         # alive until the process is finished with it
684         proc._known_hosts = tmp_known_hosts
685     
686         # The argument block == False forces to rexec to return immediately, 
687         # without blocking 
688         try:
689             err = out = " "
690             if blocking:
691                 #(out, err) = proc.communicate()
692                 # The method communicate was re implemented for performance issues
693                 # when using python subprocess communicate method the ssh commands 
694                 # last one minute each
695                 out, err = _communicate(proc, input=None)
696
697             elif stdout:
698                 out = proc.stdout.read()
699                 if proc.poll() and stderr:
700                     err = proc.stderr.read()
701
702             log(log_msg, logging.DEBUG, out, err)
703
704             if proc.poll():
705                 skip = False
706
707                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
708                     # SSH error, can safely retry
709                     skip = True 
710                 elif retry:
711                     # Probably timed out or plain failed but can retry
712                     skip = True 
713                 
714                 if skip:
715                     t = x*2
716                     msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
717                             t, x, " ".join(args))
718                     log(msg, logging.DEBUG)
719
720                     time.sleep(t)
721                     continue
722             break
723         except RuntimeError, e:
724             msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
725             log(msg, logging.DEBUG, out, err)
726
727             if retry <= 0:
728                 raise
729             retry -= 1
730
731     return ((out, err), proc)
732
733 # POSIX
734 # Don't remove. The method communicate was re implemented for performance issues
735 def _communicate(proc, input, timeout=None, err_on_timeout=True):
736     read_set = []
737     write_set = []
738     stdout = None # Return
739     stderr = None # Return
740
741     killed = False
742
743     if timeout is not None:
744         timelimit = time.time() + timeout
745         killtime = timelimit + 4
746         bailtime = timelimit + 4
747
748     if proc.stdin:
749         # Flush stdio buffer.  This might block, if the user has
750         # been writing to .stdin in an uncontrolled fashion.
751         proc.stdin.flush()
752         if input:
753             write_set.append(proc.stdin)
754         else:
755             proc.stdin.close()
756
757     if proc.stdout:
758         read_set.append(proc.stdout)
759         stdout = []
760
761     if proc.stderr:
762         read_set.append(proc.stderr)
763         stderr = []
764
765     input_offset = 0
766     while read_set or write_set:
767         if timeout is not None:
768             curtime = time.time()
769             if timeout is None or curtime > timelimit:
770                 if curtime > bailtime:
771                     break
772                 elif curtime > killtime:
773                     signum = signal.SIGKILL
774                 else:
775                     signum = signal.SIGTERM
776                 # Lets kill it
777                 os.kill(proc.pid, signum)
778                 select_timeout = 0.5
779             else:
780                 select_timeout = timelimit - curtime + 0.1
781         else:
782             select_timeout = 1.0
783
784         if select_timeout > 1.0:
785             select_timeout = 1.0
786
787         try:
788             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
789         except select.error,e:
790             if e[0] != 4:
791                 raise
792             else:
793                 continue
794
795         if not rlist and not wlist and not xlist and proc.poll() is not None:
796             # timeout and process exited, say bye
797             break
798
799         if proc.stdin in wlist:
800             # When select has indicated that the file is writable,
801             # we can write up to PIPE_BUF bytes without risk
802             # blocking.  POSIX defines PIPE_BUF >= 512
803             bytes_written = os.write(proc.stdin.fileno(),
804                     buffer(input, input_offset, 512))
805             input_offset += bytes_written
806
807             if input_offset >= len(input):
808                 proc.stdin.close()
809                 write_set.remove(proc.stdin)
810
811         if proc.stdout in rlist:
812             data = os.read(proc.stdout.fileno(), 1024)
813             if data == "":
814                 proc.stdout.close()
815                 read_set.remove(proc.stdout)
816             stdout.append(data)
817
818         if proc.stderr in rlist:
819             data = os.read(proc.stderr.fileno(), 1024)
820             if data == "":
821                 proc.stderr.close()
822                 read_set.remove(proc.stderr)
823             stderr.append(data)
824
825     # All data exchanged.  Translate lists into strings.
826     if stdout is not None:
827         stdout = ''.join(stdout)
828     if stderr is not None:
829         stderr = ''.join(stderr)
830
831     # Translate newlines, if requested.  We cannot let the file
832     # object do the translation: It is based on stdio, which is
833     # impossible to combine with select (unless forcing no
834     # buffering).
835     if proc.universal_newlines and hasattr(file, 'newlines'):
836         if stdout:
837             stdout = proc._translate_newlines(stdout)
838         if stderr:
839             stderr = proc._translate_newlines(stderr)
840
841     if killed and err_on_timeout:
842         errcode = proc.poll()
843         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
844     else:
845         if killed:
846             proc.poll()
847         else:
848             proc.wait()
849         return (stdout, stderr)
850
851