0589fb8bcced856d0c98beb7e1ba10bd311e82c5
[nepi.git] / src / neco / util / sshfuncs.py
1 import base64
2 import errno
3 import hashlib
4 import logging
5 import os
6 import os.path
7 import re
8 import select
9 import signal
10 import socket
11 import subprocess
12 import time
13 import tempfile
14
15 logger = logging.getLogger("neco.execution.utils.sshfuncs")
16
17 if hasattr(os, "devnull"):
18     DEV_NULL = os.devnull
19 else:
20     DEV_NULL = "/dev/null"
21
22 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
23
24 class STDOUT: 
25     """
26     Special value that when given to rspawn in stderr causes stderr to 
27     redirect to whatever stdout was redirected to.
28     """
29
30 class RUNNING:
31     """
32     Process is still running
33     """
34
35 class FINISHED:
36     """
37     Process is finished
38     """
39
40 class NOT_STARTED:
41     """
42     Process hasn't started running yet (this should be very rare)
43     """
44
45 hostbyname_cache = dict()
46
47 def gethostbyname(host):
48     hostbyname = hostbyname_cache.get(host)
49     if not hostbyname:
50         hostbyname = socket.gethostbyname(host)
51         hostbyname_cache[host] = hostbyname
52     return hostbyname
53
54 OPENSSH_HAS_PERSIST = None
55
56 def openssh_has_persist():
57     """ The ssh_config options ControlMaster and ControlPersist allow to
58     reuse a same network connection for multiple ssh sessions. In this 
59     way limitations on number of open ssh connections can be bypassed.
60     However, older versions of openSSH do not support this feature.
61     This function is used to determine if ssh connection persist features
62     can be used.
63     """
64     global OPENSSH_HAS_PERSIST
65     if OPENSSH_HAS_PERSIST is None:
66         proc = subprocess.Popen(["ssh","-v"],
67             stdout = subprocess.PIPE,
68             stderr = subprocess.STDOUT,
69             stdin = open("/dev/null","r") )
70         out,err = proc.communicate()
71         proc.wait()
72         
73         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
74         OPENSSH_HAS_PERSIST = bool(vre.match(out))
75     return OPENSSH_HAS_PERSIST
76
77 def make_server_key_args(server_key, host, port):
78     """ Returns a reference to a temporary known_hosts file, to which 
79     the server key has been added. 
80     
81     Make sure to hold onto the temp file reference until the process is 
82     done with it
83
84     :param server_key: the server public key
85     :type server_key: str
86
87     :param host: the hostname
88     :type host: str
89
90     :param port: the ssh port
91     :type port: str
92
93     """
94     if port is not None:
95         host = '%s:%s' % (host, str(port))
96
97     # Create a temporary server key file
98     tmp_known_hosts = tempfile.NamedTemporaryFile()
99    
100     hostbyname = gethostbyname(host) 
101
102     # Add the intended host key
103     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
104     
105     # If we're not in strict mode, add user-configured keys
106     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
107         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
108         if os.access(user_hosts_path, os.R_OK):
109             f = open(user_hosts_path, "r")
110             tmp_known_hosts.write(f.read())
111             f.close()
112         
113     tmp_known_hosts.flush()
114     
115     return tmp_known_hosts
116
117 def make_control_path(agent, forward_x11):
118     ctrl_path = "/tmp/nepi_ssh"
119
120     if agent:
121         ctrl_path +="_a"
122
123     if forward_x11:
124         ctrl_path +="_x"
125
126     ctrl_path += "-%r@%h:%p"
127
128     return ctrl_path
129
130 def shell_escape(s):
131     """ Escapes strings so that they are safe to use as command-line 
132     arguments """
133     if SHELL_SAFE.match(s):
134         # safe string - no escaping needed
135         return s
136     else:
137         # unsafe string - escape
138         def escp(c):
139             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
140                 return c
141             else:
142                 return "'$'\\x%02x''" % (ord(c),)
143         s = ''.join(map(escp,s))
144         return "'%s'" % (s,)
145
146 def eintr_retry(func):
147     """Retries a function invocation when a EINTR occurs"""
148     import functools
149     @functools.wraps(func)
150     def rv(*p, **kw):
151         retry = kw.pop("_retry", False)
152         for i in xrange(0 if retry else 4):
153             try:
154                 return func(*p, **kw)
155             except (select.error, socket.error), args:
156                 if args[0] == errno.EINTR:
157                     continue
158                 else:
159                     raise 
160             except OSError, e:
161                 if e.errno == errno.EINTR:
162                     continue
163                 else:
164                     raise
165         else:
166             return func(*p, **kw)
167     return rv
168
169 def rexec(command, host, user, 
170         port = None, 
171         agent = True,
172         sudo = False,
173         stdin = None,
174         identity = None,
175         server_key = None,
176         env = None,
177         tty = False,
178         timeout = None,
179         retry = 0,
180         err_on_timeout = True,
181         connect_timeout = 30,
182         persistent = True,
183         forward_x11 = False):
184     """
185     Executes a remote command, returns ((stdout,stderr),process)
186     """
187     
188     tmp_known_hosts = None
189     hostip = gethostbyname(host)
190
191     args = ['ssh', '-C',
192             # Don't bother with localhost. Makes test easier
193             '-o', 'NoHostAuthenticationForLocalhost=yes',
194             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
195             '-o', 'ConnectionAttempts=3',
196             '-o', 'ServerAliveInterval=30',
197             '-o', 'TCPKeepAlive=yes',
198             '-l', user, hostip or host]
199
200     if persistent and openssh_has_persist():
201         args.extend([
202             '-o', 'ControlMaster=auto',
203             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
204             '-o', 'ControlPersist=60' ])
205
206     if agent:
207         args.append('-A')
208
209     if port:
210         args.append('-p%d' % port)
211
212     if identity:
213         args.extend(('-i', identity))
214
215     if tty:
216         args.append('-t')
217         args.append('-t')
218
219     if forward_x11:
220         args.append('-X')
221
222     if server_key:
223         # Create a temporary server key file
224         tmp_known_hosts = make_server_key_args(server_key, host, port)
225         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
226
227     args.append(command)
228
229     for x in xrange(retry or 3):
230         # connects to the remote host and starts a remote connection
231         proc = subprocess.Popen(args, 
232                 stdout = subprocess.PIPE,
233                 stdin = subprocess.PIPE, 
234                 stderr = subprocess.PIPE)
235         
236         # attach tempfile object to the process, to make sure the file stays
237         # alive until the process is finished with it
238         proc._known_hosts = tmp_known_hosts
239     
240         try:
241             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
242             logger.debug("COMMAND host %s, command %s, out %s, error %s" % (
243                 host, " ".join(args), out, err))
244
245             if proc.poll():
246                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
247                     # SSH error, can safely retry
248                     continue
249                 elif retry:
250                     # Probably timed out or plain failed but can retry
251                     continue
252             break
253         except RuntimeError, e:
254             logger.debug("EXCEPTION host %s, command %s, out %s, error %s, exception TIMEOUT ->  %s" % (
255                         host, " ".join(args), out, err, e.args))
256
257             if retry <= 0:
258                 raise
259             retry -= 1
260         
261     return ((out, err), proc)
262
263 def rcopy(source, dest,
264         port = None, 
265         agent = True, 
266         recursive = False,
267         identity = None,
268         server_key = None):
269     """
270     Copies from/to remote sites.
271     
272     Source and destination should have the user and host encoded
273     as per scp specs.
274     
275     If source is a file object, a special mode will be used to
276     create the remote file with the same contents.
277     
278     If dest is a file object, the remote file (source) will be
279     read and written into dest.
280     
281     In these modes, recursive cannot be True.
282     
283     Source can be a list of files to copy to a single destination,
284     in which case it is advised that the destination be a folder.
285     """
286     
287     logger.debug("SCP %s %s" % (source, dest))
288     
289     if isinstance(source, file) and source.tell() == 0:
290         source = source.name
291     elif hasattr(source, 'read'):
292         tmp = tempfile.NamedTemporaryFile()
293         while True:
294             buf = source.read(65536)
295             if buf:
296                 tmp.write(buf)
297             else:
298                 break
299         tmp.seek(0)
300         source = tmp.name
301     
302     if isinstance(source, file) or isinstance(dest, file) \
303             or hasattr(source, 'read')  or hasattr(dest, 'write'):
304         assert not recursive
305         
306         # Parse source/destination as <user>@<server>:<path>
307         if isinstance(dest, basestring) and ':' in dest:
308             remspec, path = dest.split(':',1)
309         elif isinstance(source, basestring) and ':' in source:
310             remspec, path = source.split(':',1)
311         else:
312             raise ValueError, "Both endpoints cannot be local"
313         user,host = remspec.rsplit('@',1)
314         
315         tmp_known_hosts = None
316         hostip = gethostbyname(host)
317         
318         args = ['ssh', '-l', user, '-C',
319                 # Don't bother with localhost. Makes test easier
320                 '-o', 'NoHostAuthenticationForLocalhost=yes',
321                 '-o', 'ConnectTimeout=60',
322                 '-o', 'ConnectionAttempts=3',
323                 '-o', 'ServerAliveInterval=30',
324                 '-o', 'TCPKeepAlive=yes',
325                 hostip or host ]
326
327         if openssh_has_persist():
328             args.extend([
329                 '-o', 'ControlMaster=auto',
330                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
331                 '-o', 'ControlPersist=60' ])
332
333         if port:
334             args.append('-P%d' % port)
335
336         if identity:
337             args.extend(('-i', identity))
338
339         if server_key:
340             # Create a temporary server key file
341             tmp_known_hosts = make_server_key_args(server_key, host, port)
342             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
343         
344         if isinstance(source, file) or hasattr(source, 'read'):
345             args.append('cat > %s' % (shell_escape(path),))
346         elif isinstance(dest, file) or hasattr(dest, 'write'):
347             args.append('cat %s' % (shell_escape(path),))
348         else:
349             raise AssertionError, "Unreachable code reached! :-Q"
350         
351         # connects to the remote host and starts a remote connection
352         if isinstance(source, file):
353             proc = subprocess.Popen(args, 
354                     stdout = open('/dev/null','w'),
355                     stderr = subprocess.PIPE,
356                     stdin = source)
357             err = proc.stderr.read()
358             proc._known_hosts = tmp_known_hosts
359             eintr_retry(proc.wait)()
360             return ((None,err), proc)
361         elif isinstance(dest, file):
362             proc = subprocess.Popen(args, 
363                     stdout = open('/dev/null','w'),
364                     stderr = subprocess.PIPE,
365                     stdin = source)
366             err = proc.stderr.read()
367             proc._known_hosts = tmp_known_hosts
368             eintr_retry(proc.wait)()
369             return ((None,err), proc)
370         elif hasattr(source, 'read'):
371             # file-like (but not file) source
372             proc = subprocess.Popen(args, 
373                     stdout = open('/dev/null','w'),
374                     stderr = subprocess.PIPE,
375                     stdin = subprocess.PIPE)
376             
377             buf = None
378             err = []
379             while True:
380                 if not buf:
381                     buf = source.read(4096)
382                 if not buf:
383                     #EOF
384                     break
385                 
386                 rdrdy, wrdy, broken = select.select(
387                     [proc.stderr],
388                     [proc.stdin],
389                     [proc.stderr,proc.stdin])
390                 
391                 if proc.stderr in rdrdy:
392                     # use os.read for fully unbuffered behavior
393                     err.append(os.read(proc.stderr.fileno(), 4096))
394                 
395                 if proc.stdin in wrdy:
396                     proc.stdin.write(buf)
397                     buf = None
398                 
399                 if broken:
400                     break
401             proc.stdin.close()
402             err.append(proc.stderr.read())
403                 
404             proc._known_hosts = tmp_known_hosts
405             eintr_retry(proc.wait)()
406             return ((None,''.join(err)), proc)
407         elif hasattr(dest, 'write'):
408             # file-like (but not file) dest
409             proc = subprocess.Popen(args, 
410                     stdout = subprocess.PIPE,
411                     stderr = subprocess.PIPE,
412                     stdin = open('/dev/null','w'))
413             
414             buf = None
415             err = []
416             while True:
417                 rdrdy, wrdy, broken = select.select(
418                     [proc.stderr, proc.stdout],
419                     [],
420                     [proc.stderr, proc.stdout])
421                 
422                 if proc.stderr in rdrdy:
423                     # use os.read for fully unbuffered behavior
424                     err.append(os.read(proc.stderr.fileno(), 4096))
425                 
426                 if proc.stdout in rdrdy:
427                     # use os.read for fully unbuffered behavior
428                     buf = os.read(proc.stdout.fileno(), 4096)
429                     dest.write(buf)
430                     
431                     if not buf:
432                         #EOF
433                         break
434                 
435                 if broken:
436                     break
437             err.append(proc.stderr.read())
438                 
439             proc._known_hosts = tmp_known_hosts
440             eintr_retry(proc.wait)()
441             return ((None,''.join(err)), proc)
442         else:
443             raise AssertionError, "Unreachable code reached! :-Q"
444     else:
445         # Parse destination as <user>@<server>:<path>
446         if isinstance(dest, basestring) and ':' in dest:
447             remspec, path = dest.split(':',1)
448         elif isinstance(source, basestring) and ':' in source:
449             remspec, path = source.split(':',1)
450         else:
451             raise ValueError, "Both endpoints cannot be local"
452         user,host = remspec.rsplit('@',1)
453         
454         # plain scp
455         tmp_known_hosts = None
456
457         args = ['scp', '-q', '-p', '-C',
458                 # Don't bother with localhost. Makes test easier
459                 '-o', 'NoHostAuthenticationForLocalhost=yes',
460                 '-o', 'ConnectTimeout=60',
461                 '-o', 'ConnectionAttempts=3',
462                 '-o', 'ServerAliveInterval=30',
463                 '-o', 'TCPKeepAlive=yes' ]
464                 
465         if port:
466             args.append('-P%d' % port)
467
468         if recursive:
469             args.append('-r')
470
471         if identity:
472             args.extend(('-i', identity))
473
474         if server_key:
475             # Create a temporary server key file
476             tmp_known_hosts = make_server_key_args(server_key, host, port)
477             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
478
479         if isinstance(source,list):
480             args.extend(source)
481         else:
482             if openssh_has_persist():
483                 args.extend([
484                     '-o', 'ControlMaster=auto',
485                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
486                     ])
487             args.append(source)
488
489         args.append(dest)
490
491         # connects to the remote host and starts a remote connection
492         proc = subprocess.Popen(args, 
493                 stdout = subprocess.PIPE,
494                 stdin = subprocess.PIPE, 
495                 stderr = subprocess.PIPE)
496         proc._known_hosts = tmp_known_hosts
497         
498         (out, err) = proc.communicate()
499         eintr_retry(proc.wait)()
500         return ((out, err), proc)
501
502 def rspawn(command, pidfile, 
503         stdout = '/dev/null', 
504         stderr = STDOUT, 
505         stdin = '/dev/null', 
506         home = None, 
507         create_home = False, 
508         sudo = False,
509         host = None, 
510         port = None, 
511         user = None, 
512         agent = None, 
513         identity = None, 
514         server_key = None,
515         tty = False):
516     """
517     Spawn a remote command such that it will continue working asynchronously.
518     
519     Parameters:
520         command: the command to run - it should be a single line.
521         
522         pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
523         
524         stdout: path of a file to redirect standard output to - must be a string.
525             Defaults to /dev/null
526         stderr: path of a file to redirect standard error to - string or the special STDOUT value
527             to redirect to the same file stdout was redirected to. Defaults to STDOUT.
528         stdin: path of a file with input to be piped into the command's standard input
529         
530         home: path of a folder to use as working directory - should exist, unless you specify create_home
531         
532         create_home: if True, the home folder will be created first with mkdir -p
533         
534         sudo: whether the command needs to be executed as root
535         
536         host/port/user/agent/identity: see rexec
537     
538     Returns:
539         (stdout, stderr), process
540         
541         Of the spawning process, which only captures errors at spawning time.
542         Usually only useful for diagnostics.
543     """
544     # Start process in a "daemonized" way, using nohup and heavy
545     # stdin/out redirection to avoid connection issues
546     if stderr is STDOUT:
547         stderr = '&1'
548     else:
549         stderr = ' ' + stderr
550     
551     daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
552         'command' : command,
553         'pidfile' : shell_escape(pidfile),
554         'stdout' : stdout,
555         'stderr' : stderr,
556         'stdin' : stdin,
557     }
558     
559     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
560             'command' : shell_escape(daemon_command),
561             'sudo' : 'sudo -S' if sudo else '',
562             'pidfile' : shell_escape(pidfile),
563             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
564             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
565         }
566
567     (out,err),proc = rexec(
568         cmd,
569         host = host,
570         port = port,
571         user = user,
572         agent = agent,
573         identity = identity,
574         server_key = server_key,
575         tty = tty ,
576         )
577     
578     if proc.wait():
579         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
580
581     return ((out, err), proc)
582
583 @eintr_retry
584 def rcheckpid(pidfile,
585         host = None, 
586         port = None, 
587         user = None, 
588         agent = None, 
589         identity = None,
590         server_key = None):
591     """
592     Check the pidfile of a process spawned with remote_spawn.
593     
594     Parameters:
595         pidfile: the pidfile passed to remote_span
596         
597         host/port/user/agent/identity: see rexec
598     
599     Returns:
600         
601         A (pid, ppid) tuple useful for calling remote_status and remote_kill,
602         or None if the pidfile isn't valid yet (maybe the process is still starting).
603     """
604
605     (out,err),proc = rexec(
606         "cat %(pidfile)s" % {
607             'pidfile' : pidfile,
608         },
609         host = host,
610         port = port,
611         user = user,
612         agent = agent,
613         identity = identity,
614         server_key = server_key
615         )
616         
617     if proc.wait():
618         return None
619     
620     if out:
621         try:
622             return map(int,out.strip().split(' ',1))
623         except:
624             # Ignore, many ways to fail that don't matter that much
625             return None
626
627 @eintr_retry
628 def rstatus(pid, ppid, 
629         host = None, 
630         port = None, 
631         user = None, 
632         agent = None, 
633         identity = None,
634         server_key = None):
635     """
636     Check the status of a process spawned with remote_spawn.
637     
638     Parameters:
639         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
640         
641         host/port/user/agent/identity: see rexec
642     
643     Returns:
644         
645         One of NOT_STARTED, RUNNING, FINISHED
646     """
647
648     (out,err),proc = rexec(
649         # Check only by pid. pid+ppid does not always work (especially with sudo) 
650         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
651             'ppid' : ppid,
652             'pid' : pid,
653         },
654         host = host,
655         port = port,
656         user = user,
657         agent = agent,
658         identity = identity,
659         server_key = server_key
660         )
661     
662     if proc.wait():
663         return NOT_STARTED
664     
665     status = False
666     if err:
667         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
668             status = True
669     elif out:
670         status = (out.strip() == 'wait')
671     else:
672         return NOT_STARTED
673     return RUNNING if status else FINISHED
674
675 @eintr_retry
676 def rkill(pid, ppid,
677         host = None, 
678         port = None, 
679         user = None, 
680         agent = None, 
681         sudo = False,
682         identity = None, 
683         server_key = None, 
684         nowait = False):
685     """
686     Kill a process spawned with remote_spawn.
687     
688     First tries a SIGTERM, and if the process does not end in 10 seconds,
689     it sends a SIGKILL.
690     
691     Parameters:
692         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
693         
694         sudo: whether the command was run with sudo - careful killing like this.
695         
696         host/port/user/agent/identity: see rexec
697     
698     Returns:
699         
700         Nothing, should have killed the process
701     """
702     
703     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
704     cmd = """
705 SUBKILL="%(subkill)s" ;
706 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
707 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
708 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
709     sleep 0.2 
710     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
711         break
712     else
713         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
714         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
715     fi
716     sleep 1.8
717 done
718 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
719     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
720     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
721 fi
722 """
723     if nowait:
724         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
725
726     (out,err),proc = rexec(
727         cmd % {
728             'ppid' : ppid,
729             'pid' : pid,
730             'sudo' : 'sudo -S' if sudo else '',
731             'subkill' : subkill,
732         },
733         host = host,
734         port = port,
735         user = user,
736         agent = agent,
737         identity = identity,
738         server_key = server_key
739         )
740     
741     # wait, don't leave zombies around
742     proc.wait()
743
744     return (out, err), proc
745
746 # POSIX
747 def _communicate(self, input, timeout=None, err_on_timeout=True):
748     read_set = []
749     write_set = []
750     stdout = None # Return
751     stderr = None # Return
752     
753     killed = False
754     
755     if timeout is not None:
756         timelimit = time.time() + timeout
757         killtime = timelimit + 4
758         bailtime = timelimit + 4
759
760     if self.stdin:
761         # Flush stdio buffer.  This might block, if the user has
762         # been writing to .stdin in an uncontrolled fashion.
763         self.stdin.flush()
764         if input:
765             write_set.append(self.stdin)
766         else:
767             self.stdin.close()
768     if self.stdout:
769         read_set.append(self.stdout)
770         stdout = []
771     if self.stderr:
772         read_set.append(self.stderr)
773         stderr = []
774
775     input_offset = 0
776     while read_set or write_set:
777         if timeout is not None:
778             curtime = time.time()
779             if timeout is None or curtime > timelimit:
780                 if curtime > bailtime:
781                     break
782                 elif curtime > killtime:
783                     signum = signal.SIGKILL
784                 else:
785                     signum = signal.SIGTERM
786                 # Lets kill it
787                 os.kill(self.pid, signum)
788                 select_timeout = 0.5
789             else:
790                 select_timeout = timelimit - curtime + 0.1
791         else:
792             select_timeout = 1.0
793         
794         if select_timeout > 1.0:
795             select_timeout = 1.0
796             
797         try:
798             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
799         except select.error,e:
800             if e[0] != 4:
801                 raise
802             else:
803                 continue
804         
805         if not rlist and not wlist and not xlist and self.poll() is not None:
806             # timeout and process exited, say bye
807             break
808
809         if self.stdin in wlist:
810             # When select has indicated that the file is writable,
811             # we can write up to PIPE_BUF bytes without risk
812             # blocking.  POSIX defines PIPE_BUF >= 512
813             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
814             input_offset += bytes_written
815             if input_offset >= len(input):
816                 self.stdin.close()
817                 write_set.remove(self.stdin)
818
819         if self.stdout in rlist:
820             data = os.read(self.stdout.fileno(), 1024)
821             if data == "":
822                 self.stdout.close()
823                 read_set.remove(self.stdout)
824             stdout.append(data)
825
826         if self.stderr in rlist:
827             data = os.read(self.stderr.fileno(), 1024)
828             if data == "":
829                 self.stderr.close()
830                 read_set.remove(self.stderr)
831             stderr.append(data)
832     
833     # All data exchanged.  Translate lists into strings.
834     if stdout is not None:
835         stdout = ''.join(stdout)
836     if stderr is not None:
837         stderr = ''.join(stderr)
838
839     # Translate newlines, if requested.  We cannot let the file
840     # object do the translation: It is based on stdio, which is
841     # impossible to combine with select (unless forcing no
842     # buffering).
843     if self.universal_newlines and hasattr(file, 'newlines'):
844         if stdout:
845             stdout = self._translate_newlines(stdout)
846         if stderr:
847             stderr = self._translate_newlines(stderr)
848
849     if killed and err_on_timeout:
850         errcode = self.poll()
851         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
852     else:
853         if killed:
854             self.poll()
855         else:
856             self.wait()
857         return (stdout, stderr)
858