Adding Linux Application scalability tests
[nepi.git] / src / neco / util / sshfuncs.py
1 import base64
2 import errno
3 import hashlib
4 import logging
5 import os
6 import os.path
7 import re
8 import select
9 import signal
10 import socket
11 import subprocess
12 import time
13 import tempfile
14
15 # TODO: Add retries to rcopy!! rcopy is not being retried!
16
17 logger = logging.getLogger("sshfuncs")
18
19 def log(msg, level, out = None, err = None):
20     if out:
21         msg += " - OUT: %s " % out
22
23     if err:
24         msg += " - ERROR: %s " % err
25
26     logger.log(level, msg)
27
28
29 if hasattr(os, "devnull"):
30     DEV_NULL = os.devnull
31 else:
32     DEV_NULL = "/dev/null"
33
34 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
35
36 class STDOUT: 
37     """
38     Special value that when given to rspawn in stderr causes stderr to 
39     redirect to whatever stdout was redirected to.
40     """
41
42 class RUNNING:
43     """
44     Process is still running
45     """
46
47 class FINISHED:
48     """
49     Process is finished
50     """
51
52 class NOT_STARTED:
53     """
54     Process hasn't started running yet (this should be very rare)
55     """
56
57 hostbyname_cache = dict()
58
59 def gethostbyname(host):
60     global hostbyname_cache
61     
62     hostbyname = hostbyname_cache.get(host)
63     if not hostbyname:
64         hostbyname = socket.gethostbyname(host)
65         hostbyname_cache[host] = hostbyname
66     return hostbyname
67
68 OPENSSH_HAS_PERSIST = None
69
70 def openssh_has_persist():
71     """ The ssh_config options ControlMaster and ControlPersist allow to
72     reuse a same network connection for multiple ssh sessions. In this 
73     way limitations on number of open ssh connections can be bypassed.
74     However, older versions of openSSH do not support this feature.
75     This function is used to determine if ssh connection persist features
76     can be used.
77     """
78     global OPENSSH_HAS_PERSIST
79     if OPENSSH_HAS_PERSIST is None:
80         proc = subprocess.Popen(["ssh","-v"],
81             stdout = subprocess.PIPE,
82             stderr = subprocess.STDOUT,
83             stdin = open("/dev/null","r") )
84         out,err = proc.communicate()
85         proc.wait()
86         
87         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
88         OPENSSH_HAS_PERSIST = bool(vre.match(out))
89     return OPENSSH_HAS_PERSIST
90
91 def make_server_key_args(server_key, host, port):
92     """ Returns a reference to a temporary known_hosts file, to which 
93     the server key has been added. 
94     
95     Make sure to hold onto the temp file reference until the process is 
96     done with it
97
98     :param server_key: the server public key
99     :type server_key: str
100
101     :param host: the hostname
102     :type host: str
103
104     :param port: the ssh port
105     :type port: str
106
107     """
108     if port is not None:
109         host = '%s:%s' % (host, str(port))
110
111     # Create a temporary server key file
112     tmp_known_hosts = tempfile.NamedTemporaryFile()
113    
114     hostbyname = gethostbyname(host) 
115
116     # Add the intended host key
117     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
118     
119     # If we're not in strict mode, add user-configured keys
120     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
121         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
122         if os.access(user_hosts_path, os.R_OK):
123             f = open(user_hosts_path, "r")
124             tmp_known_hosts.write(f.read())
125             f.close()
126         
127     tmp_known_hosts.flush()
128     
129     return tmp_known_hosts
130
131 def make_control_path(agent, forward_x11):
132     ctrl_path = "/tmp/nepi_ssh"
133
134     if agent:
135         ctrl_path +="_a"
136
137     if forward_x11:
138         ctrl_path +="_x"
139
140     ctrl_path += "-%r@%h:%p"
141
142     return ctrl_path
143
144 def shell_escape(s):
145     """ Escapes strings so that they are safe to use as command-line 
146     arguments """
147     if SHELL_SAFE.match(s):
148         # safe string - no escaping needed
149         return s
150     else:
151         # unsafe string - escape
152         def escp(c):
153             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
154                 return c
155             else:
156                 return "'$'\\x%02x''" % (ord(c),)
157         s = ''.join(map(escp,s))
158         return "'%s'" % (s,)
159
160 def eintr_retry(func):
161     """Retries a function invocation when a EINTR occurs"""
162     import functools
163     @functools.wraps(func)
164     def rv(*p, **kw):
165         retry = kw.pop("_retry", False)
166         for i in xrange(0 if retry else 4):
167             try:
168                 return func(*p, **kw)
169             except (select.error, socket.error), args:
170                 if args[0] == errno.EINTR:
171                     continue
172                 else:
173                     raise 
174             except OSError, e:
175                 if e.errno == errno.EINTR:
176                     continue
177                 else:
178                     raise
179         else:
180             return func(*p, **kw)
181     return rv
182
183 def rexec(command, host, user, 
184         port = None, 
185         agent = True,
186         sudo = False,
187         stdin = None,
188         identity = None,
189         server_key = None,
190         env = None,
191         tty = False,
192         timeout = None,
193         retry = 3,
194         err_on_timeout = True,
195         connect_timeout = 30,
196         persistent = True,
197         forward_x11 = False,
198         strict_host_checking = True):
199     """
200     Executes a remote command, returns ((stdout,stderr),process)
201     """
202     
203     tmp_known_hosts = None
204     hostip = gethostbyname(host)
205
206     args = ['ssh', '-C',
207             # Don't bother with localhost. Makes test easier
208             '-o', 'NoHostAuthenticationForLocalhost=yes',
209             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
210             '-o', 'ConnectionAttempts=3',
211             '-o', 'ServerAliveInterval=30',
212             '-o', 'TCPKeepAlive=yes',
213             '-l', user, hostip or host]
214
215     if persistent and openssh_has_persist():
216         args.extend([
217             '-o', 'ControlMaster=auto',
218             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
219             '-o', 'ControlPersist=60' ])
220
221     if not strict_host_checking:
222         # Do not check for Host key. Unsafe.
223         args.extend(['-o', 'StrictHostKeyChecking=no'])
224
225     if agent:
226         args.append('-A')
227
228     if port:
229         args.append('-p%d' % port)
230
231     if identity:
232         args.extend(('-i', identity))
233
234     if tty:
235         args.append('-t')
236         args.append('-t')
237
238     if forward_x11:
239         args.append('-X')
240
241     if server_key:
242         # Create a temporary server key file
243         tmp_known_hosts = make_server_key_args(server_key, host, port)
244         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
245
246     args.append(command)
247
248     for x in xrange(retry):
249         # connects to the remote host and starts a remote connection
250         proc = subprocess.Popen(args,
251                 env = env,
252                 stdout = subprocess.PIPE,
253                 stdin = subprocess.PIPE, 
254                 stderr = subprocess.PIPE)
255         
256         # attach tempfile object to the process, to make sure the file stays
257         # alive until the process is finished with it
258         proc._known_hosts = tmp_known_hosts
259     
260         try:
261             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
262             msg = " rexec - host %s - command %s " % (host, " ".join(args))
263             log(msg, logging.DEBUG, out, err)
264
265             if proc.poll():
266                 skip = False
267
268                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
269                     # SSH error, can safely retry
270                     skip = True 
271                 elif retry:
272                     # Probably timed out or plain failed but can retry
273                     skip = True 
274                 
275                 if skip:
276                     t = x*2
277                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
278                             t, x, host, " ".join(args))
279                     log(msg, logging.DEBUG)
280
281                     time.sleep(t)
282                     continue
283             break
284         except RuntimeError, e:
285             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
286             log(msg, logging.DEBUG, out, err)
287
288             if retry <= 0:
289                 raise
290             retry -= 1
291         
292     return ((out, err), proc)
293
294 def rcopy(source, dest,
295         port = None, 
296         agent = True, 
297         recursive = False,
298         identity = None,
299         server_key = None,
300         strict_host_checking = True):
301     """
302     Copies from/to remote sites.
303     
304     Source and destination should have the user and host encoded
305     as per scp specs.
306     
307     If source is a file object, a special mode will be used to
308     create the remote file with the same contents.
309     
310     If dest is a file object, the remote file (source) will be
311     read and written into dest.
312     
313     In these modes, recursive cannot be True.
314     
315     Source can be a list of files to copy to a single destination,
316     in which case it is advised that the destination be a folder.
317     """
318     
319     msg = " rcopy - scp %s %s " % (source, dest)
320     log(msg, logging.DEBUG)
321     
322     if isinstance(source, file) and source.tell() == 0:
323         source = source.name
324     elif hasattr(source, 'read'):
325         tmp = tempfile.NamedTemporaryFile()
326         while True:
327             buf = source.read(65536)
328             if buf:
329                 tmp.write(buf)
330             else:
331                 break
332         tmp.seek(0)
333         source = tmp.name
334     
335     if isinstance(source, file) or isinstance(dest, file) \
336             or hasattr(source, 'read')  or hasattr(dest, 'write'):
337         assert not recursive
338         
339         # Parse source/destination as <user>@<server>:<path>
340         if isinstance(dest, basestring) and ':' in dest:
341             remspec, path = dest.split(':',1)
342         elif isinstance(source, basestring) and ':' in source:
343             remspec, path = source.split(':',1)
344         else:
345             raise ValueError, "Both endpoints cannot be local"
346         user,host = remspec.rsplit('@',1)
347         
348         tmp_known_hosts = None
349         hostip = gethostbyname(host)
350         
351         args = ['ssh', '-l', user, '-C',
352                 # Don't bother with localhost. Makes test easier
353                 '-o', 'NoHostAuthenticationForLocalhost=yes',
354                 '-o', 'ConnectTimeout=60',
355                 '-o', 'ConnectionAttempts=3',
356                 '-o', 'ServerAliveInterval=30',
357                 '-o', 'TCPKeepAlive=yes',
358                 hostip or host ]
359
360         if openssh_has_persist():
361             args.extend([
362                 '-o', 'ControlMaster=auto',
363                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
364                 '-o', 'ControlPersist=60' ])
365
366         if port:
367             args.append('-P%d' % port)
368
369         if identity:
370             args.extend(('-i', identity))
371
372         if server_key:
373             # Create a temporary server key file
374             tmp_known_hosts = make_server_key_args(server_key, host, port)
375             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
376         
377         if isinstance(source, file) or hasattr(source, 'read'):
378             args.append('cat > %s' % (shell_escape(path),))
379         elif isinstance(dest, file) or hasattr(dest, 'write'):
380             args.append('cat %s' % (shell_escape(path),))
381         else:
382             raise AssertionError, "Unreachable code reached! :-Q"
383         
384         # connects to the remote host and starts a remote connection
385         if isinstance(source, file):
386             proc = subprocess.Popen(args, 
387                     stdout = open('/dev/null','w'),
388                     stderr = subprocess.PIPE,
389                     stdin = source)
390             err = proc.stderr.read()
391             proc._known_hosts = tmp_known_hosts
392             eintr_retry(proc.wait)()
393             return ((None,err), proc)
394         elif isinstance(dest, file):
395             proc = subprocess.Popen(args, 
396                     stdout = open('/dev/null','w'),
397                     stderr = subprocess.PIPE,
398                     stdin = source)
399             err = proc.stderr.read()
400             proc._known_hosts = tmp_known_hosts
401             eintr_retry(proc.wait)()
402             return ((None,err), proc)
403         elif hasattr(source, 'read'):
404             # file-like (but not file) source
405             proc = subprocess.Popen(args, 
406                     stdout = open('/dev/null','w'),
407                     stderr = subprocess.PIPE,
408                     stdin = subprocess.PIPE)
409             
410             buf = None
411             err = []
412             while True:
413                 if not buf:
414                     buf = source.read(4096)
415                 if not buf:
416                     #EOF
417                     break
418                 
419                 rdrdy, wrdy, broken = select.select(
420                     [proc.stderr],
421                     [proc.stdin],
422                     [proc.stderr,proc.stdin])
423                 
424                 if proc.stderr in rdrdy:
425                     # use os.read for fully unbuffered behavior
426                     err.append(os.read(proc.stderr.fileno(), 4096))
427                 
428                 if proc.stdin in wrdy:
429                     proc.stdin.write(buf)
430                     buf = None
431                 
432                 if broken:
433                     break
434             proc.stdin.close()
435             err.append(proc.stderr.read())
436                 
437             proc._known_hosts = tmp_known_hosts
438             eintr_retry(proc.wait)()
439             return ((None,''.join(err)), proc)
440         elif hasattr(dest, 'write'):
441             # file-like (but not file) dest
442             proc = subprocess.Popen(args, 
443                     stdout = subprocess.PIPE,
444                     stderr = subprocess.PIPE,
445                     stdin = open('/dev/null','w'))
446             
447             buf = None
448             err = []
449             while True:
450                 rdrdy, wrdy, broken = select.select(
451                     [proc.stderr, proc.stdout],
452                     [],
453                     [proc.stderr, proc.stdout])
454                 
455                 if proc.stderr in rdrdy:
456                     # use os.read for fully unbuffered behavior
457                     err.append(os.read(proc.stderr.fileno(), 4096))
458                 
459                 if proc.stdout in rdrdy:
460                     # use os.read for fully unbuffered behavior
461                     buf = os.read(proc.stdout.fileno(), 4096)
462                     dest.write(buf)
463                     
464                     if not buf:
465                         #EOF
466                         break
467                 
468                 if broken:
469                     break
470             err.append(proc.stderr.read())
471                 
472             proc._known_hosts = tmp_known_hosts
473             eintr_retry(proc.wait)()
474             return ((None,''.join(err)), proc)
475         else:
476             raise AssertionError, "Unreachable code reached! :-Q"
477     else:
478         # Parse destination as <user>@<server>:<path>
479         if isinstance(dest, basestring) and ':' in dest:
480             remspec, path = dest.split(':',1)
481         elif isinstance(source, basestring) and ':' in source:
482             remspec, path = source.split(':',1)
483         else:
484             raise ValueError, "Both endpoints cannot be local"
485         user,host = remspec.rsplit('@',1)
486         
487         # plain scp
488         tmp_known_hosts = None
489
490         args = ['scp', '-q', '-p', '-C',
491                 # Don't bother with localhost. Makes test easier
492                 '-o', 'NoHostAuthenticationForLocalhost=yes',
493                 '-o', 'ConnectTimeout=60',
494                 '-o', 'ConnectionAttempts=3',
495                 '-o', 'ServerAliveInterval=30',
496                 '-o', 'TCPKeepAlive=yes' ]
497                 
498         if port:
499             args.append('-P%d' % port)
500
501         if recursive:
502             args.append('-r')
503
504         if identity:
505             args.extend(('-i', identity))
506
507         if server_key:
508             # Create a temporary server key file
509             tmp_known_hosts = make_server_key_args(server_key, host, port)
510             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
511
512         if not strict_host_checking:
513             # Do not check for Host key. Unsafe.
514             args.extend(['-o', 'StrictHostKeyChecking=no'])
515
516         if isinstance(source,list):
517             args.extend(source)
518         else:
519             if openssh_has_persist():
520                 args.extend([
521                     '-o', 'ControlMaster=auto',
522                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
523                     ])
524             args.append(source)
525
526         args.append(dest)
527
528         # connects to the remote host and starts a remote connection
529         proc = subprocess.Popen(args, 
530                 stdout = subprocess.PIPE,
531                 stdin = subprocess.PIPE, 
532                 stderr = subprocess.PIPE)
533         proc._known_hosts = tmp_known_hosts
534         
535         (out, err) = proc.communicate()
536         eintr_retry(proc.wait)()
537         return ((out, err), proc)
538
539 def rspawn(command, pidfile, 
540         stdout = '/dev/null', 
541         stderr = STDOUT, 
542         stdin = '/dev/null', 
543         home = None, 
544         create_home = False, 
545         sudo = False,
546         host = None, 
547         port = None, 
548         user = None, 
549         agent = None, 
550         identity = None, 
551         server_key = None,
552         tty = False):
553     """
554     Spawn a remote command such that it will continue working asynchronously.
555     
556     Parameters:
557         command: the command to run - it should be a single line.
558         
559         pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
560         
561         stdout: path of a file to redirect standard output to - must be a string.
562             Defaults to /dev/null
563         stderr: path of a file to redirect standard error to - string or the special STDOUT value
564             to redirect to the same file stdout was redirected to. Defaults to STDOUT.
565         stdin: path of a file with input to be piped into the command's standard input
566         
567         home: path of a folder to use as working directory - should exist, unless you specify create_home
568         
569         create_home: if True, the home folder will be created first with mkdir -p
570         
571         sudo: whether the command needs to be executed as root
572         
573         host/port/user/agent/identity: see rexec
574     
575     Returns:
576         (stdout, stderr), process
577         
578         Of the spawning process, which only captures errors at spawning time.
579         Usually only useful for diagnostics.
580     """
581     # Start process in a "daemonized" way, using nohup and heavy
582     # stdin/out redirection to avoid connection issues
583     if stderr is STDOUT:
584         stderr = '&1'
585     else:
586         stderr = ' ' + stderr
587     
588     daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
589         'command' : command,
590         'pidfile' : shell_escape(pidfile),
591         'stdout' : stdout,
592         'stderr' : stderr,
593         'stdin' : stdin,
594     }
595     
596     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
597             'command' : shell_escape(daemon_command),
598             'sudo' : 'sudo -S' if sudo else '',
599             'pidfile' : shell_escape(pidfile),
600             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
601             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
602         }
603
604     (out,err),proc = rexec(
605         cmd,
606         host = host,
607         port = port,
608         user = user,
609         agent = agent,
610         identity = identity,
611         server_key = server_key,
612         tty = tty ,
613         )
614     
615     if proc.wait():
616         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
617
618     return ((out, err), proc)
619
620 @eintr_retry
621 def rcheckpid(pidfile,
622         host = None, 
623         port = None, 
624         user = None, 
625         agent = None, 
626         identity = None,
627         server_key = None):
628     """
629     Check the pidfile of a process spawned with remote_spawn.
630     
631     Parameters:
632         pidfile: the pidfile passed to remote_span
633         
634         host/port/user/agent/identity: see rexec
635     
636     Returns:
637         
638         A (pid, ppid) tuple useful for calling remote_status and remote_kill,
639         or None if the pidfile isn't valid yet (maybe the process is still starting).
640     """
641
642     (out,err),proc = rexec(
643         "cat %(pidfile)s" % {
644             'pidfile' : pidfile,
645         },
646         host = host,
647         port = port,
648         user = user,
649         agent = agent,
650         identity = identity,
651         server_key = server_key
652         )
653         
654     if proc.wait():
655         return None
656     
657     if out:
658         try:
659             return map(int,out.strip().split(' ',1))
660         except:
661             # Ignore, many ways to fail that don't matter that much
662             return None
663
664 @eintr_retry
665 def rstatus(pid, ppid, 
666         host = None, 
667         port = None, 
668         user = None, 
669         agent = None, 
670         identity = None,
671         server_key = None):
672     """
673     Check the status of a process spawned with remote_spawn.
674     
675     Parameters:
676         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
677         
678         host/port/user/agent/identity: see rexec
679     
680     Returns:
681         
682         One of NOT_STARTED, RUNNING, FINISHED
683     """
684
685     (out,err),proc = rexec(
686         # Check only by pid. pid+ppid does not always work (especially with sudo) 
687         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
688             'ppid' : ppid,
689             'pid' : pid,
690         },
691         host = host,
692         port = port,
693         user = user,
694         agent = agent,
695         identity = identity,
696         server_key = server_key
697         )
698     
699     if proc.wait():
700         return NOT_STARTED
701     
702     status = False
703     if err:
704         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
705             status = True
706     elif out:
707         status = (out.strip() == 'wait')
708     else:
709         return NOT_STARTED
710     return RUNNING if status else FINISHED
711
712 @eintr_retry
713 def rkill(pid, ppid,
714         host = None, 
715         port = None, 
716         user = None, 
717         agent = None, 
718         sudo = False,
719         identity = None, 
720         server_key = None, 
721         nowait = False):
722     """
723     Kill a process spawned with remote_spawn.
724     
725     First tries a SIGTERM, and if the process does not end in 10 seconds,
726     it sends a SIGKILL.
727     
728     Parameters:
729         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
730         
731         sudo: whether the command was run with sudo - careful killing like this.
732         
733         host/port/user/agent/identity: see rexec
734     
735     Returns:
736         
737         Nothing, should have killed the process
738     """
739     
740     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
741     cmd = """
742 SUBKILL="%(subkill)s" ;
743 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
744 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
745 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
746     sleep 0.2 
747     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
748         break
749     else
750         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
751         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
752     fi
753     sleep 1.8
754 done
755 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
756     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
757     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
758 fi
759 """
760     if nowait:
761         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
762
763     (out,err),proc = rexec(
764         cmd % {
765             'ppid' : ppid,
766             'pid' : pid,
767             'sudo' : 'sudo -S' if sudo else '',
768             'subkill' : subkill,
769         },
770         host = host,
771         port = port,
772         user = user,
773         agent = agent,
774         identity = identity,
775         server_key = server_key
776         )
777     
778     # wait, don't leave zombies around
779     proc.wait()
780
781     return (out, err), proc
782
783 # POSIX
784 def _communicate(self, input, timeout=None, err_on_timeout=True):
785     read_set = []
786     write_set = []
787     stdout = None # Return
788     stderr = None # Return
789     
790     killed = False
791     
792     if timeout is not None:
793         timelimit = time.time() + timeout
794         killtime = timelimit + 4
795         bailtime = timelimit + 4
796
797     if self.stdin:
798         # Flush stdio buffer.  This might block, if the user has
799         # been writing to .stdin in an uncontrolled fashion.
800         self.stdin.flush()
801         if input:
802             write_set.append(self.stdin)
803         else:
804             self.stdin.close()
805     if self.stdout:
806         read_set.append(self.stdout)
807         stdout = []
808     if self.stderr:
809         read_set.append(self.stderr)
810         stderr = []
811
812     input_offset = 0
813     while read_set or write_set:
814         if timeout is not None:
815             curtime = time.time()
816             if timeout is None or curtime > timelimit:
817                 if curtime > bailtime:
818                     break
819                 elif curtime > killtime:
820                     signum = signal.SIGKILL
821                 else:
822                     signum = signal.SIGTERM
823                 # Lets kill it
824                 os.kill(self.pid, signum)
825                 select_timeout = 0.5
826             else:
827                 select_timeout = timelimit - curtime + 0.1
828         else:
829             select_timeout = 1.0
830         
831         if select_timeout > 1.0:
832             select_timeout = 1.0
833             
834         try:
835             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
836         except select.error,e:
837             if e[0] != 4:
838                 raise
839             else:
840                 continue
841         
842         if not rlist and not wlist and not xlist and self.poll() is not None:
843             # timeout and process exited, say bye
844             break
845
846         if self.stdin in wlist:
847             # When select has indicated that the file is writable,
848             # we can write up to PIPE_BUF bytes without risk
849             # blocking.  POSIX defines PIPE_BUF >= 512
850             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
851             input_offset += bytes_written
852             if input_offset >= len(input):
853                 self.stdin.close()
854                 write_set.remove(self.stdin)
855
856         if self.stdout in rlist:
857             data = os.read(self.stdout.fileno(), 1024)
858             if data == "":
859                 self.stdout.close()
860                 read_set.remove(self.stdout)
861             stdout.append(data)
862
863         if self.stderr in rlist:
864             data = os.read(self.stderr.fileno(), 1024)
865             if data == "":
866                 self.stderr.close()
867                 read_set.remove(self.stderr)
868             stderr.append(data)
869     
870     # All data exchanged.  Translate lists into strings.
871     if stdout is not None:
872         stdout = ''.join(stdout)
873     if stderr is not None:
874         stderr = ''.join(stderr)
875
876     # Translate newlines, if requested.  We cannot let the file
877     # object do the translation: It is based on stdio, which is
878     # impossible to combine with select (unless forcing no
879     # buffering).
880     if self.universal_newlines and hasattr(file, 'newlines'):
881         if stdout:
882             stdout = self._translate_newlines(stdout)
883         if stderr:
884             stderr = self._translate_newlines(stderr)
885
886     if killed and err_on_timeout:
887         errcode = self.poll()
888         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
889     else:
890         if killed:
891             self.poll()
892         else:
893             self.wait()
894         return (stdout, stderr)
895