SSHApi functionality migrated to LinuxNode
[nepi.git] / src / neco / util / sshfuncs.py
1 import base64
2 import errno
3 import os
4 import os.path
5 import select
6 import signal
7 import socket
8 import subprocess
9 import time
10 import traceback
11 import re
12 import tempfile
13 import hashlib
14
15 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
16
17 if hasattr(os, "devnull"):
18     DEV_NULL = os.devnull
19 else:
20     DEV_NULL = "/dev/null"
21
22 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
23
24 class STDOUT: 
25     """
26     Special value that when given to rspawn in stderr causes stderr to 
27     redirect to whatever stdout was redirected to.
28     """
29
30 class RUNNING:
31     """
32     Process is still running
33     """
34
35 class FINISHED:
36     """
37     Process is finished
38     """
39
40 class NOT_STARTED:
41     """
42     Process hasn't started running yet (this should be very rare)
43     """
44
45 hostbyname_cache = dict()
46
47 def gethostbyname(host):
48     hostbyname = hostbyname_cache.get(host)
49     if not hostbyname:
50         hostbyname = socket.gethostbyname(host)
51         hostbyname_cache[host] = hostbyname
52     return hostbyname
53
54 OPENSSH_HAS_PERSIST = None
55
56 def openssh_has_persist():
57     """ The ssh_config options ControlMaster and ControlPersist allow to
58     reuse a same network connection for multiple ssh sessions. In this 
59     way limitations on number of open ssh connections can be bypassed.
60     However, older versions of openSSH do not support this feature.
61     This function is used to determine if ssh connection persist features
62     can be used.
63     """
64     global OPENSSH_HAS_PERSIST
65     if OPENSSH_HAS_PERSIST is None:
66         proc = subprocess.Popen(["ssh","-v"],
67             stdout = subprocess.PIPE,
68             stderr = subprocess.STDOUT,
69             stdin = open("/dev/null","r") )
70         out,err = proc.communicate()
71         proc.wait()
72         
73         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
74         OPENSSH_HAS_PERSIST = bool(vre.match(out))
75     return OPENSSH_HAS_PERSIST
76
77 def make_server_key_args(server_key, host, port):
78     """ Returns a reference to a temporary known_hosts file, to which 
79     the server key has been added. 
80     
81     Make sure to hold onto the temp file reference until the process is 
82     done with it
83
84     :param server_key: the server public key
85     :type server_key: str
86
87     :param host: the hostname
88     :type host: str
89
90     :param port: the ssh port
91     :type port: str
92
93     """
94     if port is not None:
95         host = '%s:%s' % (host, str(port))
96
97     # Create a temporary server key file
98     tmp_known_hosts = tempfile.NamedTemporaryFile()
99    
100     hostbyname = gethostbyname(host) 
101
102     # Add the intended host key
103     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
104     
105     # If we're not in strict mode, add user-configured keys
106     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
107         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
108         if os.access(user_hosts_path, os.R_OK):
109             f = open(user_hosts_path, "r")
110             tmp_known_hosts.write(f.read())
111             f.close()
112         
113     tmp_known_hosts.flush()
114     
115     return tmp_known_hosts
116
117 def make_control_path(agent, forward_x11):
118     ctrl_path = "/tmp/nepi_ssh"
119
120     if agent:
121         ctrl_path +="_a"
122
123     if forward_x11:
124         ctrl_path +="_x"
125
126     ctrl_path += "-%r@%h:%p"
127
128     return ctrl_path
129
130 def shell_escape(s):
131     """ Escapes strings so that they are safe to use as command-line 
132     arguments """
133     if SHELL_SAFE.match(s):
134         # safe string - no escaping needed
135         return s
136     else:
137         # unsafe string - escape
138         def escp(c):
139             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
140                 return c
141             else:
142                 return "'$'\\x%02x''" % (ord(c),)
143         s = ''.join(map(escp,s))
144         return "'%s'" % (s,)
145
146 def eintr_retry(func):
147     """Retries a function invocation when a EINTR occurs"""
148     import functools
149     @functools.wraps(func)
150     def rv(*p, **kw):
151         retry = kw.pop("_retry", False)
152         for i in xrange(0 if retry else 4):
153             try:
154                 return func(*p, **kw)
155             except (select.error, socket.error), args:
156                 if args[0] == errno.EINTR:
157                     continue
158                 else:
159                     raise 
160             except OSError, e:
161                 if e.errno == errno.EINTR:
162                     continue
163                 else:
164                     raise
165         else:
166             return func(*p, **kw)
167     return rv
168
169 def rexec(command, host, user, 
170         port = None, 
171         agent = True,
172         sudo = False,
173         stdin = None,
174         identity = None,
175         server_key = None,
176         env = None,
177         tty = False,
178         timeout = None,
179         retry = 0,
180         err_on_timeout = True,
181         connect_timeout = 30,
182         persistent = True,
183         forward_x11 = False):
184     """
185     Executes a remote command, returns ((stdout,stderr),process)
186     """
187     
188     tmp_known_hosts = None
189     hostip = gethostbyname(host)
190
191     args = ['ssh', '-C',
192             # Don't bother with localhost. Makes test easier
193             '-o', 'NoHostAuthenticationForLocalhost=yes',
194             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
195             '-o', 'ConnectionAttempts=3',
196             '-o', 'ServerAliveInterval=30',
197             '-o', 'TCPKeepAlive=yes',
198             '-l', user, hostip or host]
199
200     if persistent and openssh_has_persist():
201         args.extend([
202             '-o', 'ControlMaster=auto',
203             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
204             '-o', 'ControlPersist=60' ])
205
206     if agent:
207         args.append('-A')
208
209     if port:
210         args.append('-p%d' % port)
211
212     if identity:
213         args.extend(('-i', identity))
214
215     if tty:
216         args.append('-t')
217         args.append('-t')
218
219     if forward_x11:
220         args.append('-X')
221
222     if server_key:
223         # Create a temporary server key file
224         tmp_known_hosts = make_server_key_args(server_key, host, port)
225         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
226
227     args.append(command)
228
229     for x in xrange(retry or 3):
230         # connects to the remote host and starts a remote connection
231         proc = subprocess.Popen(args, 
232                 stdout = subprocess.PIPE,
233                 stdin = subprocess.PIPE, 
234                 stderr = subprocess.PIPE)
235         
236         # attach tempfile object to the process, to make sure the file stays
237         # alive until the process is finished with it
238         proc._known_hosts = tmp_known_hosts
239     
240         try:
241             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
242             if TRACE:
243                 print "COMMAND host %s, command %s, out %s, error %s" % (host, " ".join(args), out, err)
244
245             if proc.poll():
246                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
247                     # SSH error, can safely retry
248                     continue
249                 elif retry:
250                     # Probably timed out or plain failed but can retry
251                     continue
252             break
253         except RuntimeError, e:
254             if TRACE:
255                 print "EXCEPTION host %s, command %s, out %s, error %s, exception TIMEOUT ->  %s" % (
256                         host, " ".join(args), out, err, e.args)
257
258             if retry <= 0:
259                 raise
260             retry -= 1
261         
262     return ((out, err), proc)
263
264 def rcopy(source, dest,
265         port = None, 
266         agent = True, 
267         recursive = False,
268         identity = None,
269         server_key = None):
270     """
271     Copies from/to remote sites.
272     
273     Source and destination should have the user and host encoded
274     as per scp specs.
275     
276     If source is a file object, a special mode will be used to
277     create the remote file with the same contents.
278     
279     If dest is a file object, the remote file (source) will be
280     read and written into dest.
281     
282     In these modes, recursive cannot be True.
283     
284     Source can be a list of files to copy to a single destination,
285     in which case it is advised that the destination be a folder.
286     """
287     
288     if TRACE:
289         print "scp", source, dest
290     
291     if isinstance(source, file) and source.tell() == 0:
292         source = source.name
293     elif hasattr(source, 'read'):
294         tmp = tempfile.NamedTemporaryFile()
295         while True:
296             buf = source.read(65536)
297             if buf:
298                 tmp.write(buf)
299             else:
300                 break
301         tmp.seek(0)
302         source = tmp.name
303     
304     if isinstance(source, file) or isinstance(dest, file) \
305             or hasattr(source, 'read')  or hasattr(dest, 'write'):
306         assert not recursive
307         
308         # Parse source/destination as <user>@<server>:<path>
309         if isinstance(dest, basestring) and ':' in dest:
310             remspec, path = dest.split(':',1)
311         elif isinstance(source, basestring) and ':' in source:
312             remspec, path = source.split(':',1)
313         else:
314             raise ValueError, "Both endpoints cannot be local"
315         user,host = remspec.rsplit('@',1)
316         
317         tmp_known_hosts = None
318         hostip = gethostbyname(host)
319         
320         args = ['ssh', '-l', user, '-C',
321                 # Don't bother with localhost. Makes test easier
322                 '-o', 'NoHostAuthenticationForLocalhost=yes',
323                 '-o', 'ConnectTimeout=60',
324                 '-o', 'ConnectionAttempts=3',
325                 '-o', 'ServerAliveInterval=30',
326                 '-o', 'TCPKeepAlive=yes',
327                 hostip or host ]
328
329         if openssh_has_persist():
330             args.extend([
331                 '-o', 'ControlMaster=auto',
332                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
333                 '-o', 'ControlPersist=60' ])
334
335         if port:
336             args.append('-P%d' % port)
337
338         if identity:
339             args.extend(('-i', identity))
340
341         if server_key:
342             # Create a temporary server key file
343             tmp_known_hosts = make_server_key_args(server_key, host, port)
344             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
345         
346         if isinstance(source, file) or hasattr(source, 'read'):
347             args.append('cat > %s' % (shell_escape(path),))
348         elif isinstance(dest, file) or hasattr(dest, 'write'):
349             args.append('cat %s' % (shell_escape(path),))
350         else:
351             raise AssertionError, "Unreachable code reached! :-Q"
352         
353         # connects to the remote host and starts a remote connection
354         if isinstance(source, file):
355             proc = subprocess.Popen(args, 
356                     stdout = open('/dev/null','w'),
357                     stderr = subprocess.PIPE,
358                     stdin = source)
359             err = proc.stderr.read()
360             proc._known_hosts = tmp_known_hosts
361             eintr_retry(proc.wait)()
362             return ((None,err), proc)
363         elif isinstance(dest, file):
364             proc = subprocess.Popen(args, 
365                     stdout = open('/dev/null','w'),
366                     stderr = subprocess.PIPE,
367                     stdin = source)
368             err = proc.stderr.read()
369             proc._known_hosts = tmp_known_hosts
370             eintr_retry(proc.wait)()
371             return ((None,err), proc)
372         elif hasattr(source, 'read'):
373             # file-like (but not file) source
374             proc = subprocess.Popen(args, 
375                     stdout = open('/dev/null','w'),
376                     stderr = subprocess.PIPE,
377                     stdin = subprocess.PIPE)
378             
379             buf = None
380             err = []
381             while True:
382                 if not buf:
383                     buf = source.read(4096)
384                 if not buf:
385                     #EOF
386                     break
387                 
388                 rdrdy, wrdy, broken = select.select(
389                     [proc.stderr],
390                     [proc.stdin],
391                     [proc.stderr,proc.stdin])
392                 
393                 if proc.stderr in rdrdy:
394                     # use os.read for fully unbuffered behavior
395                     err.append(os.read(proc.stderr.fileno(), 4096))
396                 
397                 if proc.stdin in wrdy:
398                     proc.stdin.write(buf)
399                     buf = None
400                 
401                 if broken:
402                     break
403             proc.stdin.close()
404             err.append(proc.stderr.read())
405                 
406             proc._known_hosts = tmp_known_hosts
407             eintr_retry(proc.wait)()
408             return ((None,''.join(err)), proc)
409         elif hasattr(dest, 'write'):
410             # file-like (but not file) dest
411             proc = subprocess.Popen(args, 
412                     stdout = subprocess.PIPE,
413                     stderr = subprocess.PIPE,
414                     stdin = open('/dev/null','w'))
415             
416             buf = None
417             err = []
418             while True:
419                 rdrdy, wrdy, broken = select.select(
420                     [proc.stderr, proc.stdout],
421                     [],
422                     [proc.stderr, proc.stdout])
423                 
424                 if proc.stderr in rdrdy:
425                     # use os.read for fully unbuffered behavior
426                     err.append(os.read(proc.stderr.fileno(), 4096))
427                 
428                 if proc.stdout in rdrdy:
429                     # use os.read for fully unbuffered behavior
430                     buf = os.read(proc.stdout.fileno(), 4096)
431                     dest.write(buf)
432                     
433                     if not buf:
434                         #EOF
435                         break
436                 
437                 if broken:
438                     break
439             err.append(proc.stderr.read())
440                 
441             proc._known_hosts = tmp_known_hosts
442             eintr_retry(proc.wait)()
443             return ((None,''.join(err)), proc)
444         else:
445             raise AssertionError, "Unreachable code reached! :-Q"
446     else:
447         # Parse destination as <user>@<server>:<path>
448         if isinstance(dest, basestring) and ':' in dest:
449             remspec, path = dest.split(':',1)
450         elif isinstance(source, basestring) and ':' in source:
451             remspec, path = source.split(':',1)
452         else:
453             raise ValueError, "Both endpoints cannot be local"
454         user,host = remspec.rsplit('@',1)
455         
456         # plain scp
457         tmp_known_hosts = None
458
459         args = ['scp', '-q', '-p', '-C',
460                 # Don't bother with localhost. Makes test easier
461                 '-o', 'NoHostAuthenticationForLocalhost=yes',
462                 '-o', 'ConnectTimeout=60',
463                 '-o', 'ConnectionAttempts=3',
464                 '-o', 'ServerAliveInterval=30',
465                 '-o', 'TCPKeepAlive=yes' ]
466                 
467         if port:
468             args.append('-P%d' % port)
469
470         if recursive:
471             args.append('-r')
472
473         if identity:
474             args.extend(('-i', identity))
475
476         if server_key:
477             # Create a temporary server key file
478             tmp_known_hosts = make_server_key_args(server_key, host, port)
479             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
480
481         if isinstance(source,list):
482             args.extend(source)
483         else:
484             if openssh_has_persist():
485                 args.extend([
486                     '-o', 'ControlMaster=auto',
487                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
488                     ])
489             args.append(source)
490
491         args.append(dest)
492
493         # connects to the remote host and starts a remote connection
494         proc = subprocess.Popen(args, 
495                 stdout = subprocess.PIPE,
496                 stdin = subprocess.PIPE, 
497                 stderr = subprocess.PIPE)
498         proc._known_hosts = tmp_known_hosts
499         
500         (out, err) = proc.communicate()
501         eintr_retry(proc.wait)()
502         return ((out, err), proc)
503
504 def rspawn(command, pidfile, 
505         stdout = '/dev/null', 
506         stderr = STDOUT, 
507         stdin = '/dev/null', 
508         home = None, 
509         create_home = False, 
510         sudo = False,
511         host = None, 
512         port = None, 
513         user = None, 
514         agent = None, 
515         identity = None, 
516         server_key = None,
517         tty = False):
518     """
519     Spawn a remote command such that it will continue working asynchronously.
520     
521     Parameters:
522         command: the command to run - it should be a single line.
523         
524         pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
525         
526         stdout: path of a file to redirect standard output to - must be a string.
527             Defaults to /dev/null
528         stderr: path of a file to redirect standard error to - string or the special STDOUT value
529             to redirect to the same file stdout was redirected to. Defaults to STDOUT.
530         stdin: path of a file with input to be piped into the command's standard input
531         
532         home: path of a folder to use as working directory - should exist, unless you specify create_home
533         
534         create_home: if True, the home folder will be created first with mkdir -p
535         
536         sudo: whether the command needs to be executed as root
537         
538         host/port/user/agent/identity: see rexec
539     
540     Returns:
541         (stdout, stderr), process
542         
543         Of the spawning process, which only captures errors at spawning time.
544         Usually only useful for diagnostics.
545     """
546     # Start process in a "daemonized" way, using nohup and heavy
547     # stdin/out redirection to avoid connection issues
548     if stderr is STDOUT:
549         stderr = '&1'
550     else:
551         stderr = ' ' + stderr
552     
553     daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
554         'command' : command,
555         'pidfile' : shell_escape(pidfile),
556         'stdout' : stdout,
557         'stderr' : stderr,
558         'stdin' : stdin,
559     }
560     
561     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
562             'command' : shell_escape(daemon_command),
563             'sudo' : 'sudo -S' if sudo else '',
564             'pidfile' : shell_escape(pidfile),
565             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
566             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
567         }
568
569     (out,err),proc = rexec(
570         cmd,
571         host = host,
572         port = port,
573         user = user,
574         agent = agent,
575         identity = identity,
576         server_key = server_key,
577         tty = tty ,
578         )
579     
580     if proc.wait():
581         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
582
583     return ((out, err), proc)
584
585 @eintr_retry
586 def rcheckpid(pidfile,
587         host = None, 
588         port = None, 
589         user = None, 
590         agent = None, 
591         identity = None,
592         server_key = None):
593     """
594     Check the pidfile of a process spawned with remote_spawn.
595     
596     Parameters:
597         pidfile: the pidfile passed to remote_span
598         
599         host/port/user/agent/identity: see rexec
600     
601     Returns:
602         
603         A (pid, ppid) tuple useful for calling remote_status and remote_kill,
604         or None if the pidfile isn't valid yet (maybe the process is still starting).
605     """
606
607     (out,err),proc = rexec(
608         "cat %(pidfile)s" % {
609             'pidfile' : pidfile,
610         },
611         host = host,
612         port = port,
613         user = user,
614         agent = agent,
615         identity = identity,
616         server_key = server_key
617         )
618         
619     if proc.wait():
620         return None
621     
622     if out:
623         try:
624             return map(int,out.strip().split(' ',1))
625         except:
626             # Ignore, many ways to fail that don't matter that much
627             return None
628
629 @eintr_retry
630 def rstatus(pid, ppid, 
631         host = None, 
632         port = None, 
633         user = None, 
634         agent = None, 
635         identity = None,
636         server_key = None):
637     """
638     Check the status of a process spawned with remote_spawn.
639     
640     Parameters:
641         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
642         
643         host/port/user/agent/identity: see rexec
644     
645     Returns:
646         
647         One of NOT_STARTED, RUNNING, FINISHED
648     """
649
650     (out,err),proc = rexec(
651         # Check only by pid. pid+ppid does not always work (especially with sudo) 
652         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
653             'ppid' : ppid,
654             'pid' : pid,
655         },
656         host = host,
657         port = port,
658         user = user,
659         agent = agent,
660         identity = identity,
661         server_key = server_key
662         )
663     
664     if proc.wait():
665         return NOT_STARTED
666     
667     status = False
668     if err:
669         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
670             status = True
671     elif out:
672         status = (out.strip() == 'wait')
673     else:
674         return NOT_STARTED
675     return RUNNING if status else FINISHED
676
677 @eintr_retry
678 def rkill(pid, ppid,
679         host = None, 
680         port = None, 
681         user = None, 
682         agent = None, 
683         sudo = False,
684         identity = None, 
685         server_key = None, 
686         nowait = False):
687     """
688     Kill a process spawned with remote_spawn.
689     
690     First tries a SIGTERM, and if the process does not end in 10 seconds,
691     it sends a SIGKILL.
692     
693     Parameters:
694         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
695         
696         sudo: whether the command was run with sudo - careful killing like this.
697         
698         host/port/user/agent/identity: see rexec
699     
700     Returns:
701         
702         Nothing, should have killed the process
703     """
704     
705     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
706     cmd = """
707 SUBKILL="%(subkill)s" ;
708 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
709 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
710 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
711     sleep 0.2 
712     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
713         break
714     else
715         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
716         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
717     fi
718     sleep 1.8
719 done
720 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
721     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
722     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
723 fi
724 """
725     if nowait:
726         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
727
728     (out,err),proc = rexec(
729         cmd % {
730             'ppid' : ppid,
731             'pid' : pid,
732             'sudo' : 'sudo -S' if sudo else '',
733             'subkill' : subkill,
734         },
735         host = host,
736         port = port,
737         user = user,
738         agent = agent,
739         identity = identity,
740         server_key = server_key
741         )
742     
743     # wait, don't leave zombies around
744     proc.wait()
745
746     return (out, err), proc
747
748 # POSIX
749 def _communicate(self, input, timeout=None, err_on_timeout=True):
750     read_set = []
751     write_set = []
752     stdout = None # Return
753     stderr = None # Return
754     
755     killed = False
756     
757     if timeout is not None:
758         timelimit = time.time() + timeout
759         killtime = timelimit + 4
760         bailtime = timelimit + 4
761
762     if self.stdin:
763         # Flush stdio buffer.  This might block, if the user has
764         # been writing to .stdin in an uncontrolled fashion.
765         self.stdin.flush()
766         if input:
767             write_set.append(self.stdin)
768         else:
769             self.stdin.close()
770     if self.stdout:
771         read_set.append(self.stdout)
772         stdout = []
773     if self.stderr:
774         read_set.append(self.stderr)
775         stderr = []
776
777     input_offset = 0
778     while read_set or write_set:
779         if timeout is not None:
780             curtime = time.time()
781             if timeout is None or curtime > timelimit:
782                 if curtime > bailtime:
783                     break
784                 elif curtime > killtime:
785                     signum = signal.SIGKILL
786                 else:
787                     signum = signal.SIGTERM
788                 # Lets kill it
789                 os.kill(self.pid, signum)
790                 select_timeout = 0.5
791             else:
792                 select_timeout = timelimit - curtime + 0.1
793         else:
794             select_timeout = 1.0
795         
796         if select_timeout > 1.0:
797             select_timeout = 1.0
798             
799         try:
800             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
801         except select.error,e:
802             if e[0] != 4:
803                 raise
804             else:
805                 continue
806         
807         if not rlist and not wlist and not xlist and self.poll() is not None:
808             # timeout and process exited, say bye
809             break
810
811         if self.stdin in wlist:
812             # When select has indicated that the file is writable,
813             # we can write up to PIPE_BUF bytes without risk
814             # blocking.  POSIX defines PIPE_BUF >= 512
815             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
816             input_offset += bytes_written
817             if input_offset >= len(input):
818                 self.stdin.close()
819                 write_set.remove(self.stdin)
820
821         if self.stdout in rlist:
822             data = os.read(self.stdout.fileno(), 1024)
823             if data == "":
824                 self.stdout.close()
825                 read_set.remove(self.stdout)
826             stdout.append(data)
827
828         if self.stderr in rlist:
829             data = os.read(self.stderr.fileno(), 1024)
830             if data == "":
831                 self.stderr.close()
832                 read_set.remove(self.stderr)
833             stderr.append(data)
834     
835     # All data exchanged.  Translate lists into strings.
836     if stdout is not None:
837         stdout = ''.join(stdout)
838     if stderr is not None:
839         stderr = ''.join(stderr)
840
841     # Translate newlines, if requested.  We cannot let the file
842     # object do the translation: It is based on stdio, which is
843     # impossible to combine with select (unless forcing no
844     # buffering).
845     if self.universal_newlines and hasattr(file, 'newlines'):
846         if stdout:
847             stdout = self._translate_newlines(stdout)
848         if stderr:
849             stderr = self._translate_newlines(stderr)
850
851     if killed and err_on_timeout:
852         errcode = self.poll()
853         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
854     else:
855         if killed:
856             self.poll()
857         else:
858             self.wait()
859         return (stdout, stderr)
860