Added example for Linux Application using CCNx
[nepi.git] / src / neco / util / sshfuncs.py
1 import base64
2 import errno
3 import hashlib
4 import logging
5 import os
6 import os.path
7 import re
8 import select
9 import signal
10 import socket
11 import subprocess
12 import time
13 import tempfile
14
15
16 logger = logging.getLogger("sshfuncs")
17
18 def log(msg, level, out = None, err = None):
19     if out:
20         msg += " - OUT: %s " % out
21
22     if err:
23         msg += " - ERROR: %s " % err
24
25     logger.log(level, msg)
26
27
28 if hasattr(os, "devnull"):
29     DEV_NULL = os.devnull
30 else:
31     DEV_NULL = "/dev/null"
32
33 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
34
35 class STDOUT: 
36     """
37     Special value that when given to rspawn in stderr causes stderr to 
38     redirect to whatever stdout was redirected to.
39     """
40
41 class RUNNING:
42     """
43     Process is still running
44     """
45
46 class FINISHED:
47     """
48     Process is finished
49     """
50
51 class NOT_STARTED:
52     """
53     Process hasn't started running yet (this should be very rare)
54     """
55
56 hostbyname_cache = dict()
57
58 def gethostbyname(host):
59     hostbyname = hostbyname_cache.get(host)
60     if not hostbyname:
61         hostbyname = socket.gethostbyname(host)
62         hostbyname_cache[host] = hostbyname
63     return hostbyname
64
65 OPENSSH_HAS_PERSIST = None
66
67 def openssh_has_persist():
68     """ The ssh_config options ControlMaster and ControlPersist allow to
69     reuse a same network connection for multiple ssh sessions. In this 
70     way limitations on number of open ssh connections can be bypassed.
71     However, older versions of openSSH do not support this feature.
72     This function is used to determine if ssh connection persist features
73     can be used.
74     """
75     global OPENSSH_HAS_PERSIST
76     if OPENSSH_HAS_PERSIST is None:
77         proc = subprocess.Popen(["ssh","-v"],
78             stdout = subprocess.PIPE,
79             stderr = subprocess.STDOUT,
80             stdin = open("/dev/null","r") )
81         out,err = proc.communicate()
82         proc.wait()
83         
84         vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
85         OPENSSH_HAS_PERSIST = bool(vre.match(out))
86     return OPENSSH_HAS_PERSIST
87
88 def make_server_key_args(server_key, host, port):
89     """ Returns a reference to a temporary known_hosts file, to which 
90     the server key has been added. 
91     
92     Make sure to hold onto the temp file reference until the process is 
93     done with it
94
95     :param server_key: the server public key
96     :type server_key: str
97
98     :param host: the hostname
99     :type host: str
100
101     :param port: the ssh port
102     :type port: str
103
104     """
105     if port is not None:
106         host = '%s:%s' % (host, str(port))
107
108     # Create a temporary server key file
109     tmp_known_hosts = tempfile.NamedTemporaryFile()
110    
111     hostbyname = gethostbyname(host) 
112
113     # Add the intended host key
114     tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
115     
116     # If we're not in strict mode, add user-configured keys
117     if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
118         user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
119         if os.access(user_hosts_path, os.R_OK):
120             f = open(user_hosts_path, "r")
121             tmp_known_hosts.write(f.read())
122             f.close()
123         
124     tmp_known_hosts.flush()
125     
126     return tmp_known_hosts
127
128 def make_control_path(agent, forward_x11):
129     ctrl_path = "/tmp/nepi_ssh"
130
131     if agent:
132         ctrl_path +="_a"
133
134     if forward_x11:
135         ctrl_path +="_x"
136
137     ctrl_path += "-%r@%h:%p"
138
139     return ctrl_path
140
141 def shell_escape(s):
142     """ Escapes strings so that they are safe to use as command-line 
143     arguments """
144     if SHELL_SAFE.match(s):
145         # safe string - no escaping needed
146         return s
147     else:
148         # unsafe string - escape
149         def escp(c):
150             if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
151                 return c
152             else:
153                 return "'$'\\x%02x''" % (ord(c),)
154         s = ''.join(map(escp,s))
155         return "'%s'" % (s,)
156
157 def eintr_retry(func):
158     """Retries a function invocation when a EINTR occurs"""
159     import functools
160     @functools.wraps(func)
161     def rv(*p, **kw):
162         retry = kw.pop("_retry", False)
163         for i in xrange(0 if retry else 4):
164             try:
165                 return func(*p, **kw)
166             except (select.error, socket.error), args:
167                 if args[0] == errno.EINTR:
168                     continue
169                 else:
170                     raise 
171             except OSError, e:
172                 if e.errno == errno.EINTR:
173                     continue
174                 else:
175                     raise
176         else:
177             return func(*p, **kw)
178     return rv
179
180 def rexec(command, host, user, 
181         port = None, 
182         agent = True,
183         sudo = False,
184         stdin = None,
185         identity = None,
186         server_key = None,
187         env = None,
188         tty = False,
189         timeout = None,
190         retry = 3,
191         err_on_timeout = True,
192         connect_timeout = 30,
193         persistent = True,
194         forward_x11 = False):
195     """
196     Executes a remote command, returns ((stdout,stderr),process)
197     """
198     
199     tmp_known_hosts = None
200     hostip = gethostbyname(host)
201
202     args = ['ssh', '-C',
203             # Don't bother with localhost. Makes test easier
204             '-o', 'NoHostAuthenticationForLocalhost=yes',
205             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
206             '-o', 'ConnectionAttempts=3',
207             '-o', 'ServerAliveInterval=30',
208             '-o', 'TCPKeepAlive=yes',
209             '-l', user, hostip or host]
210
211     if persistent and openssh_has_persist():
212         args.extend([
213             '-o', 'ControlMaster=auto',
214             '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
215             '-o', 'ControlPersist=60' ])
216
217     if agent:
218         args.append('-A')
219
220     if port:
221         args.append('-p%d' % port)
222
223     if identity:
224         args.extend(('-i', identity))
225
226     if tty:
227         args.append('-t')
228         args.append('-t')
229
230     if forward_x11:
231         args.append('-X')
232
233     if server_key:
234         # Create a temporary server key file
235         tmp_known_hosts = make_server_key_args(server_key, host, port)
236         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
237
238     args.append(command)
239
240     for x in xrange(retry):
241         # connects to the remote host and starts a remote connection
242         proc = subprocess.Popen(args,
243                 env = env,
244                 stdout = subprocess.PIPE,
245                 stdin = subprocess.PIPE, 
246                 stderr = subprocess.PIPE)
247         
248         # attach tempfile object to the process, to make sure the file stays
249         # alive until the process is finished with it
250         proc._known_hosts = tmp_known_hosts
251     
252         try:
253             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
254             msg = " rexec - host %s - command %s " % (host, " ".join(args))
255             log(msg, logging.DEBUG, out, err)
256
257             if proc.poll():
258                 skip = False
259
260                 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
261                     # SSH error, can safely retry
262                     skip = True 
263                 elif retry:
264                     # Probably timed out or plain failed but can retry
265                     skip = True 
266                 
267                 if skip:
268                     t = x*2
269                     msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
270                             t, x, host, " ".join(args))
271                     log(msg, logging.DEBUG)
272
273                     time.sleep(t)
274                     continue
275             break
276         except RuntimeError, e:
277             msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
278             log(msg, logging.DEBUG, out, err)
279
280             if retry <= 0:
281                 raise
282             retry -= 1
283         
284     return ((out, err), proc)
285
286 def rcopy(source, dest,
287         port = None, 
288         agent = True, 
289         recursive = False,
290         identity = None,
291         server_key = None):
292     """
293     Copies from/to remote sites.
294     
295     Source and destination should have the user and host encoded
296     as per scp specs.
297     
298     If source is a file object, a special mode will be used to
299     create the remote file with the same contents.
300     
301     If dest is a file object, the remote file (source) will be
302     read and written into dest.
303     
304     In these modes, recursive cannot be True.
305     
306     Source can be a list of files to copy to a single destination,
307     in which case it is advised that the destination be a folder.
308     """
309     
310     msg = " rcopy - scp %s %s " % (source, dest)
311     log(msg, logging.DEBUG)
312     
313     if isinstance(source, file) and source.tell() == 0:
314         source = source.name
315     elif hasattr(source, 'read'):
316         tmp = tempfile.NamedTemporaryFile()
317         while True:
318             buf = source.read(65536)
319             if buf:
320                 tmp.write(buf)
321             else:
322                 break
323         tmp.seek(0)
324         source = tmp.name
325     
326     if isinstance(source, file) or isinstance(dest, file) \
327             or hasattr(source, 'read')  or hasattr(dest, 'write'):
328         assert not recursive
329         
330         # Parse source/destination as <user>@<server>:<path>
331         if isinstance(dest, basestring) and ':' in dest:
332             remspec, path = dest.split(':',1)
333         elif isinstance(source, basestring) and ':' in source:
334             remspec, path = source.split(':',1)
335         else:
336             raise ValueError, "Both endpoints cannot be local"
337         user,host = remspec.rsplit('@',1)
338         
339         tmp_known_hosts = None
340         hostip = gethostbyname(host)
341         
342         args = ['ssh', '-l', user, '-C',
343                 # Don't bother with localhost. Makes test easier
344                 '-o', 'NoHostAuthenticationForLocalhost=yes',
345                 '-o', 'ConnectTimeout=60',
346                 '-o', 'ConnectionAttempts=3',
347                 '-o', 'ServerAliveInterval=30',
348                 '-o', 'TCPKeepAlive=yes',
349                 hostip or host ]
350
351         if openssh_has_persist():
352             args.extend([
353                 '-o', 'ControlMaster=auto',
354                 '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
355                 '-o', 'ControlPersist=60' ])
356
357         if port:
358             args.append('-P%d' % port)
359
360         if identity:
361             args.extend(('-i', identity))
362
363         if server_key:
364             # Create a temporary server key file
365             tmp_known_hosts = make_server_key_args(server_key, host, port)
366             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
367         
368         if isinstance(source, file) or hasattr(source, 'read'):
369             args.append('cat > %s' % (shell_escape(path),))
370         elif isinstance(dest, file) or hasattr(dest, 'write'):
371             args.append('cat %s' % (shell_escape(path),))
372         else:
373             raise AssertionError, "Unreachable code reached! :-Q"
374         
375         # connects to the remote host and starts a remote connection
376         if isinstance(source, file):
377             proc = subprocess.Popen(args, 
378                     stdout = open('/dev/null','w'),
379                     stderr = subprocess.PIPE,
380                     stdin = source)
381             err = proc.stderr.read()
382             proc._known_hosts = tmp_known_hosts
383             eintr_retry(proc.wait)()
384             return ((None,err), proc)
385         elif isinstance(dest, file):
386             proc = subprocess.Popen(args, 
387                     stdout = open('/dev/null','w'),
388                     stderr = subprocess.PIPE,
389                     stdin = source)
390             err = proc.stderr.read()
391             proc._known_hosts = tmp_known_hosts
392             eintr_retry(proc.wait)()
393             return ((None,err), proc)
394         elif hasattr(source, 'read'):
395             # file-like (but not file) source
396             proc = subprocess.Popen(args, 
397                     stdout = open('/dev/null','w'),
398                     stderr = subprocess.PIPE,
399                     stdin = subprocess.PIPE)
400             
401             buf = None
402             err = []
403             while True:
404                 if not buf:
405                     buf = source.read(4096)
406                 if not buf:
407                     #EOF
408                     break
409                 
410                 rdrdy, wrdy, broken = select.select(
411                     [proc.stderr],
412                     [proc.stdin],
413                     [proc.stderr,proc.stdin])
414                 
415                 if proc.stderr in rdrdy:
416                     # use os.read for fully unbuffered behavior
417                     err.append(os.read(proc.stderr.fileno(), 4096))
418                 
419                 if proc.stdin in wrdy:
420                     proc.stdin.write(buf)
421                     buf = None
422                 
423                 if broken:
424                     break
425             proc.stdin.close()
426             err.append(proc.stderr.read())
427                 
428             proc._known_hosts = tmp_known_hosts
429             eintr_retry(proc.wait)()
430             return ((None,''.join(err)), proc)
431         elif hasattr(dest, 'write'):
432             # file-like (but not file) dest
433             proc = subprocess.Popen(args, 
434                     stdout = subprocess.PIPE,
435                     stderr = subprocess.PIPE,
436                     stdin = open('/dev/null','w'))
437             
438             buf = None
439             err = []
440             while True:
441                 rdrdy, wrdy, broken = select.select(
442                     [proc.stderr, proc.stdout],
443                     [],
444                     [proc.stderr, proc.stdout])
445                 
446                 if proc.stderr in rdrdy:
447                     # use os.read for fully unbuffered behavior
448                     err.append(os.read(proc.stderr.fileno(), 4096))
449                 
450                 if proc.stdout in rdrdy:
451                     # use os.read for fully unbuffered behavior
452                     buf = os.read(proc.stdout.fileno(), 4096)
453                     dest.write(buf)
454                     
455                     if not buf:
456                         #EOF
457                         break
458                 
459                 if broken:
460                     break
461             err.append(proc.stderr.read())
462                 
463             proc._known_hosts = tmp_known_hosts
464             eintr_retry(proc.wait)()
465             return ((None,''.join(err)), proc)
466         else:
467             raise AssertionError, "Unreachable code reached! :-Q"
468     else:
469         # Parse destination as <user>@<server>:<path>
470         if isinstance(dest, basestring) and ':' in dest:
471             remspec, path = dest.split(':',1)
472         elif isinstance(source, basestring) and ':' in source:
473             remspec, path = source.split(':',1)
474         else:
475             raise ValueError, "Both endpoints cannot be local"
476         user,host = remspec.rsplit('@',1)
477         
478         # plain scp
479         tmp_known_hosts = None
480
481         args = ['scp', '-q', '-p', '-C',
482                 # Don't bother with localhost. Makes test easier
483                 '-o', 'NoHostAuthenticationForLocalhost=yes',
484                 '-o', 'ConnectTimeout=60',
485                 '-o', 'ConnectionAttempts=3',
486                 '-o', 'ServerAliveInterval=30',
487                 '-o', 'TCPKeepAlive=yes' ]
488                 
489         if port:
490             args.append('-P%d' % port)
491
492         if recursive:
493             args.append('-r')
494
495         if identity:
496             args.extend(('-i', identity))
497
498         if server_key:
499             # Create a temporary server key file
500             tmp_known_hosts = make_server_key_args(server_key, host, port)
501             args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
502
503         if isinstance(source,list):
504             args.extend(source)
505         else:
506             if openssh_has_persist():
507                 args.extend([
508                     '-o', 'ControlMaster=auto',
509                     '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
510                     ])
511             args.append(source)
512
513         args.append(dest)
514
515         # connects to the remote host and starts a remote connection
516         proc = subprocess.Popen(args, 
517                 stdout = subprocess.PIPE,
518                 stdin = subprocess.PIPE, 
519                 stderr = subprocess.PIPE)
520         proc._known_hosts = tmp_known_hosts
521         
522         (out, err) = proc.communicate()
523         eintr_retry(proc.wait)()
524         return ((out, err), proc)
525
526 def rspawn(command, pidfile, 
527         stdout = '/dev/null', 
528         stderr = STDOUT, 
529         stdin = '/dev/null', 
530         home = None, 
531         create_home = False, 
532         sudo = False,
533         host = None, 
534         port = None, 
535         user = None, 
536         agent = None, 
537         identity = None, 
538         server_key = None,
539         tty = False):
540     """
541     Spawn a remote command such that it will continue working asynchronously.
542     
543     Parameters:
544         command: the command to run - it should be a single line.
545         
546         pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
547         
548         stdout: path of a file to redirect standard output to - must be a string.
549             Defaults to /dev/null
550         stderr: path of a file to redirect standard error to - string or the special STDOUT value
551             to redirect to the same file stdout was redirected to. Defaults to STDOUT.
552         stdin: path of a file with input to be piped into the command's standard input
553         
554         home: path of a folder to use as working directory - should exist, unless you specify create_home
555         
556         create_home: if True, the home folder will be created first with mkdir -p
557         
558         sudo: whether the command needs to be executed as root
559         
560         host/port/user/agent/identity: see rexec
561     
562     Returns:
563         (stdout, stderr), process
564         
565         Of the spawning process, which only captures errors at spawning time.
566         Usually only useful for diagnostics.
567     """
568     # Start process in a "daemonized" way, using nohup and heavy
569     # stdin/out redirection to avoid connection issues
570     if stderr is STDOUT:
571         stderr = '&1'
572     else:
573         stderr = ' ' + stderr
574     
575     daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
576         'command' : command,
577         'pidfile' : shell_escape(pidfile),
578         'stdout' : stdout,
579         'stderr' : stderr,
580         'stdin' : stdin,
581     }
582     
583     cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
584             'command' : shell_escape(daemon_command),
585             'sudo' : 'sudo -S' if sudo else '',
586             'pidfile' : shell_escape(pidfile),
587             'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
588             'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
589         }
590
591     (out,err),proc = rexec(
592         cmd,
593         host = host,
594         port = port,
595         user = user,
596         agent = agent,
597         identity = identity,
598         server_key = server_key,
599         tty = tty ,
600         )
601     
602     if proc.wait():
603         raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,)
604
605     return ((out, err), proc)
606
607 @eintr_retry
608 def rcheckpid(pidfile,
609         host = None, 
610         port = None, 
611         user = None, 
612         agent = None, 
613         identity = None,
614         server_key = None):
615     """
616     Check the pidfile of a process spawned with remote_spawn.
617     
618     Parameters:
619         pidfile: the pidfile passed to remote_span
620         
621         host/port/user/agent/identity: see rexec
622     
623     Returns:
624         
625         A (pid, ppid) tuple useful for calling remote_status and remote_kill,
626         or None if the pidfile isn't valid yet (maybe the process is still starting).
627     """
628
629     (out,err),proc = rexec(
630         "cat %(pidfile)s" % {
631             'pidfile' : pidfile,
632         },
633         host = host,
634         port = port,
635         user = user,
636         agent = agent,
637         identity = identity,
638         server_key = server_key
639         )
640         
641     if proc.wait():
642         return None
643     
644     if out:
645         try:
646             return map(int,out.strip().split(' ',1))
647         except:
648             # Ignore, many ways to fail that don't matter that much
649             return None
650
651 @eintr_retry
652 def rstatus(pid, ppid, 
653         host = None, 
654         port = None, 
655         user = None, 
656         agent = None, 
657         identity = None,
658         server_key = None):
659     """
660     Check the status of a process spawned with remote_spawn.
661     
662     Parameters:
663         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
664         
665         host/port/user/agent/identity: see rexec
666     
667     Returns:
668         
669         One of NOT_STARTED, RUNNING, FINISHED
670     """
671
672     (out,err),proc = rexec(
673         # Check only by pid. pid+ppid does not always work (especially with sudo) 
674         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
675             'ppid' : ppid,
676             'pid' : pid,
677         },
678         host = host,
679         port = port,
680         user = user,
681         agent = agent,
682         identity = identity,
683         server_key = server_key
684         )
685     
686     if proc.wait():
687         return NOT_STARTED
688     
689     status = False
690     if err:
691         if err.strip().find("Error, do this: mount -t proc none /proc") >= 0:
692             status = True
693     elif out:
694         status = (out.strip() == 'wait')
695     else:
696         return NOT_STARTED
697     return RUNNING if status else FINISHED
698
699 @eintr_retry
700 def rkill(pid, ppid,
701         host = None, 
702         port = None, 
703         user = None, 
704         agent = None, 
705         sudo = False,
706         identity = None, 
707         server_key = None, 
708         nowait = False):
709     """
710     Kill a process spawned with remote_spawn.
711     
712     First tries a SIGTERM, and if the process does not end in 10 seconds,
713     it sends a SIGKILL.
714     
715     Parameters:
716         pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
717         
718         sudo: whether the command was run with sudo - careful killing like this.
719         
720         host/port/user/agent/identity: see rexec
721     
722     Returns:
723         
724         Nothing, should have killed the process
725     """
726     
727     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
728     cmd = """
729 SUBKILL="%(subkill)s" ;
730 %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
731 %(sudo)s kill %(pid)d $SUBKILL || /bin/true
732 for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
733     sleep 0.2 
734     if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
735         break
736     else
737         %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
738         %(sudo)s kill %(pid)d $SUBKILL || /bin/true
739     fi
740     sleep 1.8
741 done
742 if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
743     %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
744     %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
745 fi
746 """
747     if nowait:
748         cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
749
750     (out,err),proc = rexec(
751         cmd % {
752             'ppid' : ppid,
753             'pid' : pid,
754             'sudo' : 'sudo -S' if sudo else '',
755             'subkill' : subkill,
756         },
757         host = host,
758         port = port,
759         user = user,
760         agent = agent,
761         identity = identity,
762         server_key = server_key
763         )
764     
765     # wait, don't leave zombies around
766     proc.wait()
767
768     return (out, err), proc
769
770 # POSIX
771 def _communicate(self, input, timeout=None, err_on_timeout=True):
772     read_set = []
773     write_set = []
774     stdout = None # Return
775     stderr = None # Return
776     
777     killed = False
778     
779     if timeout is not None:
780         timelimit = time.time() + timeout
781         killtime = timelimit + 4
782         bailtime = timelimit + 4
783
784     if self.stdin:
785         # Flush stdio buffer.  This might block, if the user has
786         # been writing to .stdin in an uncontrolled fashion.
787         self.stdin.flush()
788         if input:
789             write_set.append(self.stdin)
790         else:
791             self.stdin.close()
792     if self.stdout:
793         read_set.append(self.stdout)
794         stdout = []
795     if self.stderr:
796         read_set.append(self.stderr)
797         stderr = []
798
799     input_offset = 0
800     while read_set or write_set:
801         if timeout is not None:
802             curtime = time.time()
803             if timeout is None or curtime > timelimit:
804                 if curtime > bailtime:
805                     break
806                 elif curtime > killtime:
807                     signum = signal.SIGKILL
808                 else:
809                     signum = signal.SIGTERM
810                 # Lets kill it
811                 os.kill(self.pid, signum)
812                 select_timeout = 0.5
813             else:
814                 select_timeout = timelimit - curtime + 0.1
815         else:
816             select_timeout = 1.0
817         
818         if select_timeout > 1.0:
819             select_timeout = 1.0
820             
821         try:
822             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
823         except select.error,e:
824             if e[0] != 4:
825                 raise
826             else:
827                 continue
828         
829         if not rlist and not wlist and not xlist and self.poll() is not None:
830             # timeout and process exited, say bye
831             break
832
833         if self.stdin in wlist:
834             # When select has indicated that the file is writable,
835             # we can write up to PIPE_BUF bytes without risk
836             # blocking.  POSIX defines PIPE_BUF >= 512
837             bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
838             input_offset += bytes_written
839             if input_offset >= len(input):
840                 self.stdin.close()
841                 write_set.remove(self.stdin)
842
843         if self.stdout in rlist:
844             data = os.read(self.stdout.fileno(), 1024)
845             if data == "":
846                 self.stdout.close()
847                 read_set.remove(self.stdout)
848             stdout.append(data)
849
850         if self.stderr in rlist:
851             data = os.read(self.stderr.fileno(), 1024)
852             if data == "":
853                 self.stderr.close()
854                 read_set.remove(self.stderr)
855             stderr.append(data)
856     
857     # All data exchanged.  Translate lists into strings.
858     if stdout is not None:
859         stdout = ''.join(stdout)
860     if stderr is not None:
861         stderr = ''.join(stderr)
862
863     # Translate newlines, if requested.  We cannot let the file
864     # object do the translation: It is based on stdio, which is
865     # impossible to combine with select (unless forcing no
866     # buffering).
867     if self.universal_newlines and hasattr(file, 'newlines'):
868         if stdout:
869             stdout = self._translate_newlines(stdout)
870         if stderr:
871             stderr = self._translate_newlines(stderr)
872
873     if killed and err_on_timeout:
874         errcode = self.poll()
875         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
876     else:
877         if killed:
878             self.poll()
879         else:
880             self.wait()
881         return (stdout, stderr)
882