2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
27 CTRL_SOCK = "ctrl.sock"
29 STD_ERR = "stderr.log"
34 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
36 OPENSSH_HAS_PERSIST = None
38 if hasattr(os, "devnull"):
41 DEV_NULL = "/dev/null"
43 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
45 def openssh_has_persist():
46 global OPENSSH_HAS_PERSIST
47 if OPENSSH_HAS_PERSIST is None:
48 proc = subprocess.Popen(["ssh","-v"],
49 stdout = subprocess.PIPE,
50 stderr = subprocess.STDOUT,
51 stdin = open("/dev/null","r") )
52 out,err = proc.communicate()
55 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
56 OPENSSH_HAS_PERSIST = bool(vre.match(out))
57 return OPENSSH_HAS_PERSIST
60 """ Escapes strings so that they are safe to use as command-line arguments """
61 if SHELL_SAFE.match(s):
62 # safe string - no escaping needed
65 # unsafe string - escape
67 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
70 return "'$'\\x%02x''" % (ord(c),)
71 s = ''.join(map(escp,s))
74 def eintr_retry(func):
76 @functools.wraps(func)
78 retry = kw.pop("_retry", False)
79 for i in xrange(0 if retry else 4):
82 except (select.error, socket.error), args:
83 if args[0] == errno.EINTR:
88 if e.errno == errno.EINTR:
97 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
98 environment_setup = "", clean_root = False):
99 self._root_dir = root_dir
100 self._clean_root = clean_root
102 self._ctrl_sock = None
103 self._log_level = log_level
105 self._environment_setup = environment_setup
110 self.post_daemonize()
114 # can not return normally after fork beacuse no exec was done.
115 # This means that if we don't do a os._exit(0) here the code that
116 # follows the call to "Server.run()" in the "caller code" will be
117 # executed... but by now it has already been executed after the
118 # first process (the one that did the first fork) returned.
121 print >>sys.stderr, "SERVER_ERROR."
125 print >>sys.stderr, "SERVER_READY."
128 # pipes for process synchronization
132 root = os.path.normpath(self._root_dir)
133 if self._root_dir not in [".", ""] and os.path.exists(root) \
134 and self._clean_root:
136 if not os.path.exists(root):
137 os.makedirs(root, 0755)
145 except OSError, e: # pragma: no cover
146 if e.errno == errno.EINTR:
152 # os.waitpid avoids leaving a <defunc> (zombie) process
153 st = os.waitpid(pid1, 0)[1]
155 raise RuntimeError("Daemonization failed")
156 # return 0 to inform the caller method that this is not the
161 # Decouple from parent environment.
162 os.chdir(self._root_dir)
169 # see ref: "os._exit(0)"
172 # close all open file descriptors.
173 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
174 if (max_fd == resource.RLIM_INFINITY):
176 for fd in range(3, max_fd):
183 # Redirect standard file descriptors.
184 stdin = open(DEV_NULL, "r")
185 stderr = stdout = open(STD_ERR, "a", 0)
186 os.dup2(stdin.fileno(), sys.stdin.fileno())
187 # NOTE: sys.stdout.write will still be buffered, even if the file
188 # was opened with 0 buffer
189 os.dup2(stdout.fileno(), sys.stdout.fileno())
190 os.dup2(stderr.fileno(), sys.stderr.fileno())
193 if self._environment_setup:
194 # parse environment variables and pass to child process
195 # do it by executing shell commands, in case there's some heavy setup involved
196 envproc = subprocess.Popen(
198 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
199 ( self._environment_setup, ) ],
200 stdin = subprocess.PIPE,
201 stdout = subprocess.PIPE,
202 stderr = subprocess.PIPE
204 out,err = envproc.communicate()
206 # parse new environment
208 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
210 # apply to current environment
211 for name, value in environment.iteritems():
212 os.environ[name] = value
215 if 'PYTHONPATH' in environment:
216 sys.path = environment['PYTHONPATH'].split(':') + sys.path
218 # create control socket
219 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
221 self._ctrl_sock.bind(CTRL_SOCK)
223 # Address in use, check pidfile
226 pidfile = open(CTRL_PID, "r")
235 # Check process liveliness
236 if not os.path.exists("/proc/%d" % (pid,)):
237 # Ok, it's dead, clean the socket
241 self._ctrl_sock.bind(CTRL_SOCK)
243 self._ctrl_sock.listen(0)
246 pidfile = open(CTRL_PID, "w")
247 pidfile.write(str(os.getpid()))
250 # let the parent process know that the daemonization is finished
255 def post_daemonize(self):
256 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
257 # QT, for some strange reason, redefines the SIGCHILD handler to write
258 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
259 # Server dameonization closes all file descriptors from fileno '3',
260 # but the overloaded handler (inherited by the forked process) will
261 # keep trying to write the \0 to fileno 'x', which might have been reused
262 # after closing, for other operations. This is bad bad bad when fileno 'x'
263 # is in use for communication pouroses, because unexpected \0 start
264 # appearing in the communication messages... this is exactly what happens
265 # when using netns in daemonized form. Thus, be have no other alternative than
266 # restoring the SIGCHLD handler to the default here.
268 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
271 while not self._stop:
272 conn, addr = self._ctrl_sock.accept()
273 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
275 while not self._stop:
277 msg = self.recv_msg(conn)
278 except socket.timeout, e:
279 #self.log_error("SERVER recv_msg: connection timedout ")
283 self.log_error("CONNECTION LOST")
288 reply = self.stop_action()
290 reply = self.reply_action(msg)
293 self.send_reply(conn, reply)
296 self.log_error("NOTICE: Awaiting for reconnection")
304 def recv_msg(self, conn):
307 while '\n' not in chunk:
309 chunk = conn.recv(1024)
310 except (OSError, socket.error), e:
311 if e[0] != errno.EINTR:
320 data = ''.join(data).split('\n',1)
323 data, self._rdbuf = data
325 decoded = base64.b64decode(data)
326 return decoded.rstrip()
328 def send_reply(self, conn, reply):
329 encoded = base64.b64encode(reply)
330 conn.send("%s\n" % encoded)
334 self._ctrl_sock.close()
339 def stop_action(self):
340 return "Stopping server"
342 def reply_action(self, msg):
343 return "Reply to: %s" % msg
345 def log_error(self, text = None, context = ''):
347 text = traceback.format_exc()
348 date = time.strftime("%Y-%m-%d %H:%M:%S")
350 context = " (%s)" % (context,)
351 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
354 def log_debug(self, text):
355 if self._log_level == DC.DEBUG_LEVEL:
356 date = time.strftime("%Y-%m-%d %H:%M:%S")
357 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
359 class Forwarder(object):
360 def __init__(self, root_dir = "."):
361 self._ctrl_sock = None
362 self._root_dir = root_dir
368 print >>sys.stderr, "FORWARDER_READY."
369 while not self._stop:
370 data = self.read_data()
372 # Connection to client lost
374 self.send_to_server(data)
376 data = self.recv_from_server()
378 # Connection to server lost
379 raise IOError, "Connection to server lost while "\
381 self.write_data(data)
385 return sys.stdin.readline()
387 def write_data(self, data):
388 sys.stdout.write(data)
389 # sys.stdout.write is buffered, this is why we need to do a flush()
392 def send_to_server(self, data):
394 self._ctrl_sock.send(data)
395 except (IOError, socket.error), e:
396 if e[0] == errno.EPIPE:
398 self._ctrl_sock.send(data)
401 encoded = data.rstrip()
402 msg = base64.b64decode(encoded)
406 def recv_from_server(self):
409 while '\n' not in chunk:
411 chunk = self._ctrl_sock.recv(1024)
412 except (OSError, socket.error), e:
413 if e[0] != errno.EINTR:
421 data = ''.join(data).split('\n',1)
424 data, self._rdbuf = data
430 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
431 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
432 self._ctrl_sock.connect(sock_addr)
434 def disconnect(self):
436 self._ctrl_sock.close()
440 class Client(object):
441 def __init__(self, root_dir = ".", host = None, port = None, user = None,
442 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
443 environment_setup = ""):
444 self.root_dir = root_dir
445 self.addr = (host, port)
449 self.communication = communication
450 self.environment_setup = environment_setup
451 self._stopped = False
452 self._deferreds = collections.deque()
456 if self._process.poll() is None:
457 os.kill(self._process.pid, signal.SIGTERM)
461 root_dir = self.root_dir
462 (host, port) = self.addr
466 communication = self.communication
468 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
469 c.forward()" % (root_dir,)
471 self._process = popen_python(python_code,
472 communication = communication,
478 environment_setup = self.environment_setup)
480 # Wait for the forwarder to be ready, otherwise nobody
481 # will be able to connect to it
485 helo = self._process.stderr.readline()
486 if helo == 'FORWARDER_READY.\n':
490 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
492 def send_msg(self, msg):
493 encoded = base64.b64encode(msg)
494 data = "%s\n" % encoded
497 self._process.stdin.write(data)
498 except (IOError, ValueError):
499 # dead process, poll it to un-zombify
502 # try again after reconnect
503 # If it fails again, though, give up
505 self._process.stdin.write(data)
508 self.send_msg(STOP_MSG)
511 def defer_reply(self, transform=None):
513 self._deferreds.append(defer_entry)
515 functools.partial(self.read_reply, defer_entry, transform)
518 def _read_reply(self):
519 data = self._process.stdout.readline()
520 encoded = data.rstrip()
522 # empty == eof == dead process, poll it to un-zombify
525 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
526 return base64.b64decode(encoded)
528 def read_reply(self, which=None, transform=None):
529 # Test to see if someone did it already
530 if which is not None and len(which):
532 # ...just return the deferred value
534 return transform(which[0])
538 # Process all deferreds until the one we're looking for
539 # or until the queue is empty
540 while self._deferreds:
542 deferred = self._deferreds.popleft()
547 deferred.append(self._read_reply())
548 if deferred is which:
549 # We reached the one we were looking for
551 return transform(deferred[0])
556 # They've requested a synchronous read
558 return transform(self._read_reply())
560 return self._read_reply()
562 def _make_server_key_args(server_key, host, port, args):
564 Returns a reference to the created temporary file, and adds the
565 corresponding arguments to the given argument list.
567 Make sure to hold onto it until the process is done with the file
570 host = '%s:%s' % (host,port)
571 # Create a temporary server key file
572 tmp_known_hosts = tempfile.NamedTemporaryFile()
574 # Add the intended host key
575 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
577 # If we're not in strict mode, add user-configured keys
578 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
579 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
580 if os.access(user_hosts_path, os.R_OK):
581 f = open(user_hosts_path, "r")
582 tmp_known_hosts.write(f.read())
585 tmp_known_hosts.flush()
587 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
589 return tmp_known_hosts
591 def make_connkey(user, host, port):
592 connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
593 if len(connkey) > 60:
594 connkey = hashlib.sha1(connkey).hexdigest()
597 def popen_ssh_command(command, host, port, user, agent,
604 err_on_timeout = True,
605 connect_timeout = 30,
609 Executes a remote commands, returns ((stdout,stderr),process)
612 print "ssh", host, command
614 tmp_known_hosts = None
615 connkey = make_connkey(user,host,port)
617 # Don't bother with localhost. Makes test easier
618 '-o', 'NoHostAuthenticationForLocalhost=yes',
619 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
620 '-o', 'ConnectionAttempts=3',
621 '-o', 'ServerAliveInterval=30',
622 '-o', 'TCPKeepAlive=yes',
623 '-l', user, hostip or host]
624 if persistent and openssh_has_persist():
626 '-o', 'ControlMaster=auto',
627 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
628 '-o', 'ControlPersist=60' ])
632 args.append('-p%d' % port)
634 args.extend(('-i', ident_key))
638 # Create a temporary server key file
639 tmp_known_hosts = _make_server_key_args(
640 server_key, host, port, args)
643 for x in xrange(retry or 3):
644 # connects to the remote host and starts a remote connection
645 proc = subprocess.Popen(args,
646 stdout = subprocess.PIPE,
647 stdin = subprocess.PIPE,
648 stderr = subprocess.PIPE)
650 # attach tempfile object to the process, to make sure the file stays
651 # alive until the process is finished with it
652 proc._known_hosts = tmp_known_hosts
655 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
657 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
658 # SSH error, can safely retry
661 # Probably timed out or plain failed but can retry
664 except RuntimeError,e:
668 print " timedout -> ", e.args
672 print " -> ", out, err
674 return ((out, err), proc)
676 def popen_scp(source, dest,
683 Copies from/to remote sites.
685 Source and destination should have the user and host encoded
688 If source is a file object, a special mode will be used to
689 create the remote file with the same contents.
691 If dest is a file object, the remote file (source) will be
692 read and written into dest.
694 In these modes, recursive cannot be True.
696 Source can be a list of files to copy to a single destination,
697 in which case it is advised that the destination be a folder.
701 print "scp", source, dest
703 if isinstance(source, file) and source.tell() == 0:
705 elif hasattr(source, 'read'):
706 tmp = tempfile.NamedTemporaryFile()
708 buf = source.read(65536)
716 if isinstance(source, file) or isinstance(dest, file) \
717 or hasattr(source, 'read') or hasattr(dest, 'write'):
720 # Parse source/destination as <user>@<server>:<path>
721 if isinstance(dest, basestring) and ':' in dest:
722 remspec, path = dest.split(':',1)
723 elif isinstance(source, basestring) and ':' in source:
724 remspec, path = source.split(':',1)
726 raise ValueError, "Both endpoints cannot be local"
727 user,host = remspec.rsplit('@',1)
728 tmp_known_hosts = None
730 connkey = make_connkey(user,host,port)
731 args = ['ssh', '-l', user, '-C',
732 # Don't bother with localhost. Makes test easier
733 '-o', 'NoHostAuthenticationForLocalhost=yes',
734 '-o', 'ConnectTimeout=30',
735 '-o', 'ConnectionAttempts=3',
736 '-o', 'ServerAliveInterval=30',
737 '-o', 'TCPKeepAlive=yes',
739 if openssh_has_persist():
741 '-o', 'ControlMaster=auto',
742 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
743 '-o', 'ControlPersist=60' ])
745 args.append('-P%d' % port)
747 args.extend(('-i', ident_key))
749 # Create a temporary server key file
750 tmp_known_hosts = _make_server_key_args(
751 server_key, host, port, args)
753 if isinstance(source, file) or hasattr(source, 'read'):
754 args.append('cat > %s' % (shell_escape(path),))
755 elif isinstance(dest, file) or hasattr(dest, 'write'):
756 args.append('cat %s' % (shell_escape(path),))
758 raise AssertionError, "Unreachable code reached! :-Q"
760 # connects to the remote host and starts a remote connection
761 if isinstance(source, file):
762 proc = subprocess.Popen(args,
763 stdout = open('/dev/null','w'),
764 stderr = subprocess.PIPE,
766 err = proc.stderr.read()
767 proc._known_hosts = tmp_known_hosts
768 eintr_retry(proc.wait)()
769 return ((None,err), proc)
770 elif isinstance(dest, file):
771 proc = subprocess.Popen(args,
772 stdout = open('/dev/null','w'),
773 stderr = subprocess.PIPE,
775 err = proc.stderr.read()
776 proc._known_hosts = tmp_known_hosts
777 eintr_retry(proc.wait)()
778 return ((None,err), proc)
779 elif hasattr(source, 'read'):
780 # file-like (but not file) source
781 proc = subprocess.Popen(args,
782 stdout = open('/dev/null','w'),
783 stderr = subprocess.PIPE,
784 stdin = subprocess.PIPE)
790 buf = source.read(4096)
795 rdrdy, wrdy, broken = select.select(
798 [proc.stderr,proc.stdin])
800 if proc.stderr in rdrdy:
801 # use os.read for fully unbuffered behavior
802 err.append(os.read(proc.stderr.fileno(), 4096))
804 if proc.stdin in wrdy:
805 proc.stdin.write(buf)
811 err.append(proc.stderr.read())
813 proc._known_hosts = tmp_known_hosts
814 eintr_retry(proc.wait)()
815 return ((None,''.join(err)), proc)
816 elif hasattr(dest, 'write'):
817 # file-like (but not file) dest
818 proc = subprocess.Popen(args,
819 stdout = subprocess.PIPE,
820 stderr = subprocess.PIPE,
821 stdin = open('/dev/null','w'))
826 rdrdy, wrdy, broken = select.select(
827 [proc.stderr, proc.stdout],
829 [proc.stderr, proc.stdout])
831 if proc.stderr in rdrdy:
832 # use os.read for fully unbuffered behavior
833 err.append(os.read(proc.stderr.fileno(), 4096))
835 if proc.stdout in rdrdy:
836 # use os.read for fully unbuffered behavior
837 buf = os.read(proc.stdout.fileno(), 4096)
846 err.append(proc.stderr.read())
848 proc._known_hosts = tmp_known_hosts
849 eintr_retry(proc.wait)()
850 return ((None,''.join(err)), proc)
852 raise AssertionError, "Unreachable code reached! :-Q"
854 # Parse destination as <user>@<server>:<path>
855 if isinstance(dest, basestring) and ':' in dest:
856 remspec, path = dest.split(':',1)
857 elif isinstance(source, basestring) and ':' in source:
858 remspec, path = source.split(':',1)
860 raise ValueError, "Both endpoints cannot be local"
861 user,host = remspec.rsplit('@',1)
864 tmp_known_hosts = None
865 args = ['scp', '-q', '-p', '-C',
866 # Don't bother with localhost. Makes test easier
867 '-o', 'NoHostAuthenticationForLocalhost=yes',
868 '-o', 'ConnectTimeout=30',
869 '-o', 'ConnectionAttempts=3',
870 '-o', 'ServerAliveInterval=30',
871 '-o', 'TCPKeepAlive=yes' ]
874 args.append('-P%d' % port)
878 args.extend(('-i', ident_key))
880 # Create a temporary server key file
881 tmp_known_hosts = _make_server_key_args(
882 server_key, host, port, args)
883 if isinstance(source,list):
886 if openssh_has_persist():
887 connkey = make_connkey(user,host,port)
889 '-o', 'ControlMaster=no',
890 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ) ])
894 # connects to the remote host and starts a remote connection
895 proc = subprocess.Popen(args,
896 stdout = subprocess.PIPE,
897 stdin = subprocess.PIPE,
898 stderr = subprocess.PIPE)
899 proc._known_hosts = tmp_known_hosts
901 comm = proc.communicate()
902 eintr_retry(proc.wait)()
905 def decode_and_execute():
906 # The python code we want to execute might have characters that
907 # are not compatible with the 'inline' mode we are using. To avoid
908 # problems we receive the encoded python code in base64 as a input
909 # stream and decode it for execution.
914 cmd += os.read(0, 1)# one byte from stdin
916 if e.errno == errno.EINTR:
922 cmd = base64.b64decode(cmd)
923 # Uncomment for debug
924 #os.write(2, "Executing python code: %s\n" % cmd)
925 os.write(1, "OK\n") # send a sync message
928 def popen_python(python_code,
929 communication = DC.ACCESS_LOCAL,
939 environment_setup = ""):
943 python_path.replace("'", r"'\''")
944 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
946 if environment_setup:
947 cmd += environment_setup
949 # Uncomment for debug (to run everything under strace)
950 # We had to verify if strace works (cannot nest them)
951 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
953 #cmd += "strace -f -tt -s 200 -o strace$$.out "
955 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
956 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
961 cmd = "sudo bash -c " + shell_escape(cmd)
965 if communication == DC.ACCESS_SSH:
966 tmp_known_hosts = None
968 # Don't bother with localhost. Makes test easier
969 '-o', 'NoHostAuthenticationForLocalhost=yes',
970 '-o', 'ConnectionAttempts=3',
971 '-o', 'ServerAliveInterval=30',
972 '-o', 'TCPKeepAlive=yes',
977 args.append('-p%d' % port)
979 args.extend(('-i', ident_key))
983 # Create a temporary server key file
984 tmp_known_hosts = _make_server_key_args(
985 server_key, host, port, args)
988 args = [ "/bin/bash", "-c", cmd ]
990 # connects to the remote host and starts a remote
991 proc = subprocess.Popen(args,
993 stdout = subprocess.PIPE,
994 stdin = subprocess.PIPE,
995 stderr = subprocess.PIPE)
997 if communication == DC.ACCESS_SSH:
998 proc._known_hosts = tmp_known_hosts
1000 # send the command to execute
1001 os.write(proc.stdin.fileno(),
1002 base64.b64encode(python_code) + "\n")
1006 msg = os.read(proc.stdout.fileno(), 3)
1009 if e.errno == errno.EINTR:
1015 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1016 msg, proc.stdout.read(), proc.stderr.read())
1021 def _communicate(self, input, timeout=None, err_on_timeout=True):
1024 stdout = None # Return
1025 stderr = None # Return
1029 if timeout is not None:
1030 timelimit = time.time() + timeout
1031 killtime = timelimit + 4
1032 bailtime = timelimit + 4
1035 # Flush stdio buffer. This might block, if the user has
1036 # been writing to .stdin in an uncontrolled fashion.
1039 write_set.append(self.stdin)
1043 read_set.append(self.stdout)
1046 read_set.append(self.stderr)
1050 while read_set or write_set:
1051 if timeout is not None:
1052 curtime = time.time()
1053 if timeout is None or curtime > timelimit:
1054 if curtime > bailtime:
1056 elif curtime > killtime:
1057 signum = signal.SIGKILL
1059 signum = signal.SIGTERM
1061 os.kill(self.pid, signum)
1062 select_timeout = 0.5
1064 select_timeout = timelimit - curtime + 0.1
1066 select_timeout = 1.0
1068 if select_timeout > 1.0:
1069 select_timeout = 1.0
1072 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1073 except select.error,e:
1079 if not rlist and not wlist and not xlist and self.poll() is not None:
1080 # timeout and process exited, say bye
1083 if self.stdin in wlist:
1084 # When select has indicated that the file is writable,
1085 # we can write up to PIPE_BUF bytes without risk
1086 # blocking. POSIX defines PIPE_BUF >= 512
1087 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1088 input_offset += bytes_written
1089 if input_offset >= len(input):
1091 write_set.remove(self.stdin)
1093 if self.stdout in rlist:
1094 data = os.read(self.stdout.fileno(), 1024)
1097 read_set.remove(self.stdout)
1100 if self.stderr in rlist:
1101 data = os.read(self.stderr.fileno(), 1024)
1104 read_set.remove(self.stderr)
1107 # All data exchanged. Translate lists into strings.
1108 if stdout is not None:
1109 stdout = ''.join(stdout)
1110 if stderr is not None:
1111 stderr = ''.join(stderr)
1113 # Translate newlines, if requested. We cannot let the file
1114 # object do the translation: It is based on stdio, which is
1115 # impossible to combine with select (unless forcing no
1117 if self.universal_newlines and hasattr(file, 'newlines'):
1119 stdout = self._translate_newlines(stdout)
1121 stderr = self._translate_newlines(stderr)
1123 if killed and err_on_timeout:
1124 errcode = self.poll()
1125 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1131 return (stdout, stderr)