b5d8f0ed8f61a2e46f96cd0cbf39e9b1ff78959c
[nepi.git] / src / neco / util / sshfuncs.py
1 import base64
2 import errno
3 import hashlib
4 import logging
5 import os
6 import os.path
7 import re
8 import select
9 import signal
10 import socket
11 import subprocess
12 import time
13 import tempfile
14
15
16 logger = logging.getLogger("sshfuncs")
17
18 def log(msg, level, out = None, err = None):
19     if out:
20         msg += " - OUT: %s " % out
21
22     if err:
23         msg += " - ERROR: %s " % err
24
25     logger.log(level, msg)
26
27
28 if hasattr(os, "devnull"):
29     DEV_NULL = os.devnull
30 else:
31     DEV_NULL = "/dev/null"
32
33 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
34
35 class STDOUT: 
36     """
37     Special value that when given to rspawn in stderr causes stderr to 
38     redirect to whatever stdout was redirected to.
39     """
40
41 class RUNNING:
42     """
43     Process is still running
44     """
45
46 class FINISHED:
47     """
48     Process is finished
49     """
50
51 class NOT_STARTED:
52     """
53     Process hasn't started running yet (this should be very rare)
54     """
55
56 hostbyname_cache = dict()
57
58 def gethostbyname(host):
59     hostbyname = hostbyname_cache.get(host)
60     if not hostbyname:
61         hostbyname = socket.gethostbyname(host)
62         hostbyname_cache[host] = hostbyname
63     return hostbyname
64
65 OPENSSH_HAS_PERSIST = None
66
67 def openssh_has_persist():
68     """ The ssh_config options ControlMaster and ControlPersist allow to
69     reuse a same network connection for multiple ssh sessions. In this 
70     way limitations on number of open ssh connections can be bypassed.
71     However, older versions of openSSH do not support this feature.
72     This function is used to determine if ssh connection persist features
73     can be used.
74     """
75     global OPENSSH_HAS_PERSIST
76     if OPENSSH_HAS_PERSIST is None:
77         proc = subprocess.Popen(["ssh","-v"],
78             stdout = subprocess.PIPE,
79             stderr = subprocess.STDOUT,
80             stdin = open("/dev/null","r") )
81         out,err = proc.communicate()
82         proc.wait()
83         
84         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
85         OPENSSH_HAS_PERSIST = bool(vre.match(out))
86     return OPENSSH_HAS_PERSIST
87
88 def make_server_key_args(server_key, host, port):
89     """ Returns a reference to a temporary known_hosts file, to which 
90     the server key has been added. 
91     
92     Make sure to hold onto the temp file reference until the process is 
93     done with it
94
95     :param server_key: the server public key
96     :type server_key: str
97
98     :param host: the hostname
99     :type host: str
100
101     :param port: the ssh port
102     :type port: str
103
104     """
105     if port is not None:
106         host = '%s:%s' % (host, str(port))
107
108     # Create a temporary server key file
109     tmp_known_hosts = tempfile.NamedTemporaryFile()
110    
111     hostbyname = gethostbyname(host) 
112
113     # Add the intended host key
114     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
115     
116     # If we're not in strict mode, add user-configured keys
117     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
118         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
119         if os.access(user_hosts_path, os.R_OK):
120             f = open(user_hosts_path, "r")
121             tmp_known_hosts.write(f.read())
122             f.close()
123         
124     tmp_known_hosts.flush()
125     
126     return tmp_known_hosts
127
128 def make_control_path(agent, forward_x11):
129     ctrl_path = "/tmp/nepi_ssh"
130
131     if agent:
132         ctrl_path +="_a"
133
134     if forward_x11:
135         ctrl_path +="_x"
136
137     ctrl_path += "-%r@%h:%p"
138
139     return ctrl_path
140
141 def shell_escape(s):
142     """ Escapes strings so that they are safe to use as command-line 
143     arguments """
144     if SHELL_SAFE.match(s):
145         # safe string - no escaping needed
146         return s
147     else:
148         # unsafe string - escape
149         def escp(c):
150             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
151                 return c
152             else:
153                 return "'$'\\x%02x''" % (ord(c),)
154         s = ''.join(map(escp,s))
155         return "'%s'" % (s,)
156
157 def eintr_retry(func):
158     """Retries a function invocation when a EINTR occurs"""
159     import functools
160     @functools.wraps(func)
161     def rv(*p, **kw):
162         retry = kw.pop("_retry", False)
163         for i in xrange(0 if retry else 4):
164             try:
165                 return func(*p, **kw)
166             except (select.error, socket.error), args:
167                 if args[0] == errno.EINTR:
168                     continue
169                 else:
170                     raise 
171             except OSError, e:
172                 if e.errno == errno.EINTR:
173                     continue
174                 else:
175                     raise
176         else:
177             return func(*p, **kw)
178     return rv
179
180 def rexec(command, host, user, 
181         port = None, 
182         agent = True,
183         sudo = False,
184         stdin = None,
185         identity = None,
186         server_key = None,
187         env = None,
188         tty = False,
189         timeout = None,
190         retry = 3,
191         err_on_timeout = True,
192         connect_timeout = 30,
193         persistent = True,
194         forward_x11 = False):
195     """
196     Executes a remote command, returns ((stdout,stderr),process)
197     """
198     
199     tmp_known_hosts = None
200     hostip = gethostbyname(host)
201
202     args = ['ssh', '-C',
203             # Don't bother with localhost. Makes test easier
204             '-o', 'NoHostAuthenticationForLocalhost=yes',
205             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
206             '-o', 'ConnectionAttempts=3',
207             '-o', 'ServerAliveInterval=30',
208             '-o', 'TCPKeepAlive=yes',
209             '-l', user, hostip or host]
210
211     if persistent and openssh_has_persist():
212         args.extend([
213             '-o', 'ControlMaster=auto',
214             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
215             '-o', 'ControlPersist=60' ])
216
217     if agent:
218         args.append('-A')
219
220     if port:
221         args.append('-p%d' % port)
222
223     if identity:
224         args.extend(('-i', identity))
225
226     if tty:
227         args.append('-t')
228         args.append('-t')
229
230     if forward_x11:
231         args.append('-X')
232
233     if server_key:
234         # Create a temporary server key file
235         tmp_known_hosts = make_server_key_args(server_key, host, port)
236         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
237
238     args.append(command)
239
240     for x in xrange(retry):
241         # connects to the remote host and starts a remote connection
242         proc = subprocess.Popen(args, 
243                 stdout = subprocess.PIPE,
244                 stdin = subprocess.PIPE, 
245                 stderr = subprocess.PIPE)
246         
247         # attach tempfile object to the process, to make sure the file stays
248         # alive until the process is finished with it
249         proc._known_hosts = tmp_known_hosts
250     
251         try:
252             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
253             msg = " rexec - host %s - command %s " % (host, " ".join(args))
254             log(msg, logging.DEBUG, out, err)
255
256             if proc.poll():
257                 skip = False
258
259                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
260                     # SSH error, can safely retry
261                     skip = True 
262                 elif retry:
263                     # Probably timed out or plain failed but can retry
264                     skip = True 
265                 
266                 if skip:
267                     t = x*2
268                     msg = "SLEEPING %d ... ATEMP %d - host %s - command %s " % ( 
269                             t, x, host, " ".join(args))
270                     log(msg, logging.DEBUG)
271
272                     time.sleep(t)
273                     continue
274             break
275         except RuntimeError, e:
276             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
277             log(msg, logging.DEBUG, out, err)
278
279             if retry <= 0:
280                 raise
281             retry -= 1
282         
283     return ((out, err), proc)
284
285 def rcopy(source, dest,
286         port = None, 
287         agent = True, 
288         recursive = False,
289         identity = None,
290         server_key = None):
291     """
292     Copies from/to remote sites.
293     
294     Source and destination should have the user and host encoded
295     as per scp specs.
296     
297     If source is a file object, a special mode will be used to
298     create the remote file with the same contents.
299     
300     If dest is a file object, the remote file (source) will be
301     read and written into dest.
302     
303     In these modes, recursive cannot be True.
304     
305     Source can be a list of files to copy to a single destination,
306     in which case it is advised that the destination be a folder.
307     """
308     
309     msg = " rcopy - scp %s %s " % (source, dest)
310     log(msg, logging.DEBUG)
311     
312     if isinstance(source, file) and source.tell() == 0:
313         source = source.name
314     elif hasattr(source, 'read'):
315         tmp = tempfile.NamedTemporaryFile()
316         while True:
317             buf = source.read(65536)
318             if buf:
319                 tmp.write(buf)
320             else:
321                 break
322         tmp.seek(0)
323         source = tmp.name
324     
325     if isinstance(source, file) or isinstance(dest, file) \
326             or hasattr(source, 'read')  or hasattr(dest, 'write'):
327         assert not recursive
328         
329         # Parse source/destination as <user>@<server>:<path>
330         if isinstance(dest, basestring) and ':' in dest:
331             remspec, path = dest.split(':',1)
332         elif isinstance(source, basestring) and ':' in source:
333             remspec, path = source.split(':',1)
334         else:
335             raise ValueError, "Both endpoints cannot be local"
336         user,host = remspec.rsplit('@',1)
337         
338         tmp_known_hosts = None
339         hostip = gethostbyname(host)
340         
341         args = ['ssh', '-l', user, '-C',
342                 # Don't bother with localhost. Makes test easier
343                 '-o', 'NoHostAuthenticationForLocalhost=yes',
344                 '-o', 'ConnectTimeout=60',
345                 '-o', 'ConnectionAttempts=3',
346                 '-o', 'ServerAliveInterval=30',
347                 '-o', 'TCPKeepAlive=yes',
348                 hostip or host ]
349
350         if openssh_has_persist():
351             args.extend([
352                 '-o', 'ControlMaster=auto',
353                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
354                 '-o', 'ControlPersist=60' ])
355
356         if port:
357             args.append('-P%d' % port)
358
359         if identity:
360             args.extend(('-i', identity))
361
362         if server_key:
363             # Create a temporary server key file
364             tmp_known_hosts = make_server_key_args(server_key, host, port)
365             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
366         
367         if isinstance(source, file) or hasattr(source, 'read'):
368             args.append('cat > %s' % (shell_escape(path),))
369         elif isinstance(dest, file) or hasattr(dest, 'write'):
370             args.append('cat %s' % (shell_escape(path),))
371         else:
372             raise AssertionError, "Unreachable code reached! :-Q"
373         
374         # connects to the remote host and starts a remote connection
375         if isinstance(source, file):
376             proc = subprocess.Popen(args, 
377                     stdout = open('/dev/null','w'),
378                     stderr = subprocess.PIPE,
379                     stdin = source)
380             err = proc.stderr.read()
381             proc._known_hosts = tmp_known_hosts
382             eintr_retry(proc.wait)()
383             return ((None,err), proc)
384         elif isinstance(dest, file):
385             proc = subprocess.Popen(args, 
386                     stdout = open('/dev/null','w'),
387                     stderr = subprocess.PIPE,
388                     stdin = source)
389             err = proc.stderr.read()
390             proc._known_hosts = tmp_known_hosts
391             eintr_retry(proc.wait)()
392             return ((None,err), proc)
393         elif hasattr(source, 'read'):
394             # file-like (but not file) source
395             proc = subprocess.Popen(args, 
396                     stdout = open('/dev/null','w'),
397                     stderr = subprocess.PIPE,
398                     stdin = subprocess.PIPE)
399             
400             buf = None
401             err = []
402             while True:
403                 if not buf:
404                     buf = source.read(4096)
405                 if not buf:
406                     #EOF
407                     break
408                 
409                 rdrdy, wrdy, broken = select.select(
410                     [proc.stderr],
411                     [proc.stdin],
412                     [proc.stderr,proc.stdin])
413                 
414                 if proc.stderr in rdrdy:
415                     # use os.read for fully unbuffered behavior
416                     err.append(os.read(proc.stderr.fileno(), 4096))
417                 
418                 if proc.stdin in wrdy:
419                     proc.stdin.write(buf)
420                     buf = None
421                 
422                 if broken:
423                     break
424             proc.stdin.close()
425             err.append(proc.stderr.read())
426                 
427             proc._known_hosts = tmp_known_hosts
428             eintr_retry(proc.wait)()
429             return ((None,''.join(err)), proc)
430         elif hasattr(dest, 'write'):
431             # file-like (but not file) dest
432             proc = subprocess.Popen(args, 
433                     stdout = subprocess.PIPE,
434                     stderr = subprocess.PIPE,
435                     stdin = open('/dev/null','w'))
436             
437             buf = None
438             err = []
439             while True:
440                 rdrdy, wrdy, broken = select.select(
441                     [proc.stderr, proc.stdout],
442                     [],
443                     [proc.stderr, proc.stdout])
444                 
445                 if proc.stderr in rdrdy:
446                     # use os.read for fully unbuffered behavior
447                     err.append(os.read(proc.stderr.fileno(), 4096))
448                 
449                 if proc.stdout in rdrdy:
450                     # use os.read for fully unbuffered behavior
451                     buf = os.read(proc.stdout.fileno(), 4096)
452                     dest.write(buf)
453                     
454                     if not buf:
455                         #EOF
456                         break
457                 
458                 if broken:
459                     break
460             err.append(proc.stderr.read())
461                 
462             proc._known_hosts = tmp_known_hosts
463             eintr_retry(proc.wait)()
464             return ((None,''.join(err)), proc)
465         else:
466             raise AssertionError, "Unreachable code reached! :-Q"
467     else:
468         # Parse destination as <user>@<server>:<path>
469         if isinstance(dest, basestring) and ':' in dest:
470             remspec, path = dest.split(':',1)
471         elif isinstance(source, basestring) and ':' in source:
472             remspec, path = source.split(':',1)
473         else:
474             raise ValueError, "Both endpoints cannot be local"
475         user,host = remspec.rsplit('@',1)
476         
477         # plain scp
478         tmp_known_hosts = None
479
480         args = ['scp', '-q', '-p', '-C',
481                 # Don't bother with localhost. Makes test easier
482                 '-o', 'NoHostAuthenticationForLocalhost=yes',
483                 '-o', 'ConnectTimeout=60',
484                 '-o', 'ConnectionAttempts=3',
485                 '-o', 'ServerAliveInterval=30',
486                 '-o', 'TCPKeepAlive=yes' ]
487                 
488         if port:
489             args.append('-P%d' % port)
490
491         if recursive:
492             args.append('-r')
493
494         if identity:
495             args.extend(('-i', identity))
496
497         if server_key:
498             # Create a temporary server key file
499             tmp_known_hosts = make_server_key_args(server_key, host, port)
500             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
501
502         if isinstance(source,list):
503             args.extend(source)
504         else:
505             if openssh_has_persist():
506                 args.extend([
507                     '-o', 'ControlMaster=auto',
508                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
509                     ])
510             args.append(source)
511
512         args.append(dest)
513
514         # connects to the remote host and starts a remote connection
515         proc = subprocess.Popen(args, 
516                 stdout = subprocess.PIPE,
517                 stdin = subprocess.PIPE, 
518                 stderr = subprocess.PIPE)
519         proc._known_hosts = tmp_known_hosts
520         
521         (out, err) = proc.communicate()
522         eintr_retry(proc.wait)()
523         return ((out, err), proc)
524
525 def rspawn(command, pidfile, 
526         stdout = '/dev/null', 
527         stderr = STDOUT, 
528         stdin = '/dev/null', 
529         home = None, 
530         create_home = False, 
531         sudo = False,
532         host = None, 
533         port = None, 
534         user = None, 
535         agent = None, 
536         identity = None, 
537         server_key = None,
538         tty = False):
539     """
540     Spawn a remote command such that it will continue working asynchronously.
541     
542     Parameters:
543         command: the command to run - it should be a single line.
544         
545         pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
546         
547         stdout: path of a file to redirect standard output to - must be a string.
548             Defaults to /dev/null
549         stderr: path of a file to redirect standard error to - string or the special STDOUT value
550             to redirect to the same file stdout was redirected to. Defaults to STDOUT.
551         stdin: path of a file with input to be piped into the command's standard input
552         
553         home: path of a folder to use as working directory - should exist, unless you specify create_home
554         
555         create_home: if True, the home folder will be created first with mkdir -p
556         
557         sudo: whether the command needs to be executed as root
558         
559         host/port/user/agent/identity: see rexec
560     
561     Returns:
562         (stdout, stderr), process
563         
564         Of the spawning process, which only captures errors at spawning time.
565         Usually only useful for diagnostics.
566     """
567     # Start process in a "daemonized" way, using nohup and heavy
568     # stdin/out redirection to avoid connection issues
569     if stderr is STDOUT:
570         stderr = '&1'
571     else:
572         stderr = ' ' + stderr
573     
574     daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
575         'command' : command,
576         'pidfile' : shell_escape(pidfile),
577         'stdout' : stdout,
578         'stderr' : stderr,
579         'stdin' : stdin,
580     }
581     
582     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
583             'command' : shell_escape(daemon_command),
584             'sudo' : 'sudo -S' if sudo else '',
585             'pidfile' : shell_escape(pidfile),
586             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
587             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
588         }
589
590     (out,err),proc = rexec(
591         cmd,
592         host = host,
593         port = port,
594         user = user,
595         agent = agent,
596         identity = identity,
597         server_key = server_key,
598         tty = tty ,
599         )
600     
601     if proc.wait():
602         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
603
604     return ((out, err), proc)
605
606 @eintr_retry
607 def rcheckpid(pidfile,
608         host = None, 
609         port = None, 
610         user = None, 
611         agent = None, 
612         identity = None,
613         server_key = None):
614     """
615     Check the pidfile of a process spawned with remote_spawn.
616     
617     Parameters:
618         pidfile: the pidfile passed to remote_span
619         
620         host/port/user/agent/identity: see rexec
621     
622     Returns:
623         
624         A (pid, ppid) tuple useful for calling remote_status and remote_kill,
625         or None if the pidfile isn't valid yet (maybe the process is still starting).
626     """
627
628     (out,err),proc = rexec(
629         "cat %(pidfile)s" % {
630             'pidfile' : pidfile,
631         },
632         host = host,
633         port = port,
634         user = user,
635         agent = agent,
636         identity = identity,
637         server_key = server_key
638         )
639         
640     if proc.wait():
641         return None
642     
643     if out:
644         try:
645             return map(int,out.strip().split(' ',1))
646         except:
647             # Ignore, many ways to fail that don't matter that much
648             return None
649
650 @eintr_retry
651 def rstatus(pid, ppid, 
652         host = None, 
653         port = None, 
654         user = None, 
655         agent = None, 
656         identity = None,
657         server_key = None):
658     """
659     Check the status of a process spawned with remote_spawn.
660     
661     Parameters:
662         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
663         
664         host/port/user/agent/identity: see rexec
665     
666     Returns:
667         
668         One of NOT_STARTED, RUNNING, FINISHED
669     """
670
671     (out,err),proc = rexec(
672         # Check only by pid. pid+ppid does not always work (especially with sudo) 
673         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
674             'ppid' : ppid,
675             'pid' : pid,
676         },
677         host = host,
678         port = port,
679         user = user,
680         agent = agent,
681         identity = identity,
682         server_key = server_key
683         )
684     
685     if proc.wait():
686         return NOT_STARTED
687     
688     status = False
689     if err:
690         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
691             status = True
692     elif out:
693         status = (out.strip() == 'wait')
694     else:
695         return NOT_STARTED
696     return RUNNING if status else FINISHED
697
698 @eintr_retry
699 def rkill(pid, ppid,
700         host = None, 
701         port = None, 
702         user = None, 
703         agent = None, 
704         sudo = False,
705         identity = None, 
706         server_key = None, 
707         nowait = False):
708     """
709     Kill a process spawned with remote_spawn.
710     
711     First tries a SIGTERM, and if the process does not end in 10 seconds,
712     it sends a SIGKILL.
713     
714     Parameters:
715         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
716         
717         sudo: whether the command was run with sudo - careful killing like this.
718         
719         host/port/user/agent/identity: see rexec
720     
721     Returns:
722         
723         Nothing, should have killed the process
724     """
725     
726     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
727     cmd = """
728 SUBKILL="%(subkill)s" ;
729 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
730 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
731 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
732     sleep 0.2 
733     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
734         break
735     else
736         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
737         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
738     fi
739     sleep 1.8
740 done
741 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
742     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
743     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
744 fi
745 """
746     if nowait:
747         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
748
749     (out,err),proc = rexec(
750         cmd % {
751             'ppid' : ppid,
752             'pid' : pid,
753             'sudo' : 'sudo -S' if sudo else '',
754             'subkill' : subkill,
755         },
756         host = host,
757         port = port,
758         user = user,
759         agent = agent,
760         identity = identity,
761         server_key = server_key
762         )
763     
764     # wait, don't leave zombies around
765     proc.wait()
766
767     return (out, err), proc
768
769 # POSIX
770 def _communicate(self, input, timeout=None, err_on_timeout=True):
771     read_set = []
772     write_set = []
773     stdout = None # Return
774     stderr = None # Return
775     
776     killed = False
777     
778     if timeout is not None:
779         timelimit = time.time() + timeout
780         killtime = timelimit + 4
781         bailtime = timelimit + 4
782
783     if self.stdin:
784         # Flush stdio buffer.  This might block, if the user has
785         # been writing to .stdin in an uncontrolled fashion.
786         self.stdin.flush()
787         if input:
788             write_set.append(self.stdin)
789         else:
790             self.stdin.close()
791     if self.stdout:
792         read_set.append(self.stdout)
793         stdout = []
794     if self.stderr:
795         read_set.append(self.stderr)
796         stderr = []
797
798     input_offset = 0
799     while read_set or write_set:
800         if timeout is not None:
801             curtime = time.time()
802             if timeout is None or curtime > timelimit:
803                 if curtime > bailtime:
804                     break
805                 elif curtime > killtime:
806                     signum = signal.SIGKILL
807                 else:
808                     signum = signal.SIGTERM
809                 # Lets kill it
810                 os.kill(self.pid, signum)
811                 select_timeout = 0.5
812             else:
813                 select_timeout = timelimit - curtime + 0.1
814         else:
815             select_timeout = 1.0
816         
817         if select_timeout > 1.0:
818             select_timeout = 1.0
819             
820         try:
821             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
822         except select.error,e:
823             if e[0] != 4:
824                 raise
825             else:
826                 continue
827         
828         if not rlist and not wlist and not xlist and self.poll() is not None:
829             # timeout and process exited, say bye
830             break
831
832         if self.stdin in wlist:
833             # When select has indicated that the file is writable,
834             # we can write up to PIPE_BUF bytes without risk
835             # blocking.  POSIX defines PIPE_BUF >= 512
836             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
837             input_offset += bytes_written
838             if input_offset >= len(input):
839                 self.stdin.close()
840                 write_set.remove(self.stdin)
841
842         if self.stdout in rlist:
843             data = os.read(self.stdout.fileno(), 1024)
844             if data == "":
845                 self.stdout.close()
846                 read_set.remove(self.stdout)
847             stdout.append(data)
848
849         if self.stderr in rlist:
850             data = os.read(self.stderr.fileno(), 1024)
851             if data == "":
852                 self.stderr.close()
853                 read_set.remove(self.stderr)
854             stderr.append(data)
855     
856     # All data exchanged.  Translate lists into strings.
857     if stdout is not None:
858         stdout = ''.join(stdout)
859     if stderr is not None:
860         stderr = ''.join(stderr)
861
862     # Translate newlines, if requested.  We cannot let the file
863     # object do the translation: It is based on stdio, which is
864     # impossible to combine with select (unless forcing no
865     # buffering).
866     if self.universal_newlines and hasattr(file, 'newlines'):
867         if stdout:
868             stdout = self._translate_newlines(stdout)
869         if stderr:
870             stderr = self._translate_newlines(stderr)
871
872     if killed and err_on_timeout:
873         errcode = self.poll()
874         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
875     else:
876         if killed:
877             self.poll()
878         else:
879             self.wait()
880         return (stdout, stderr)
881