1 # -*- coding: utf-8 -*-
3 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
28 STD_ERR = "stderr.log"
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35 OPENSSH_HAS_PERSIST = None
37 if hasattr(os, "devnull"):
40 DEV_NULL = "/dev/null"
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44 def openssh_has_persist():
45 global OPENSSH_HAS_PERSIST
46 if OPENSSH_HAS_PERSIST is None:
47 proc = subprocess.Popen(["ssh","-v"],
48 stdout = subprocess.PIPE,
49 stderr = subprocess.STDOUT,
50 stdin = open("/dev/null","r") )
51 out,err = proc.communicate()
54 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
55 OPENSSH_HAS_PERSIST = bool(vre.match(out))
56 return OPENSSH_HAS_PERSIST
59 """ Escapes strings so that they are safe to use as command-line arguments """
60 if SHELL_SAFE.match(s):
61 # safe string - no escaping needed
64 # unsafe string - escape
66 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
69 return "'$'\\x%02x''" % (ord(c),)
70 s = ''.join(map(escp,s))
73 def eintr_retry(func):
75 @functools.wraps(func)
77 retry = kw.pop("_retry", False)
78 for i in xrange(0 if retry else 4):
81 except (select.error, socket.error), args:
82 if args[0] == errno.EINTR:
87 if e.errno == errno.EINTR:
96 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
97 environment_setup = "", clean_root = False):
98 self._root_dir = root_dir
99 self._clean_root = clean_root
101 self._ctrl_sock = None
102 self._log_level = log_level
104 self._environment_setup = environment_setup
109 self.post_daemonize()
113 # can not return normally after fork beacuse no exec was done.
114 # This means that if we don't do a os._exit(0) here the code that
115 # follows the call to "Server.run()" in the "caller code" will be
116 # executed... but by now it has already been executed after the
117 # first process (the one that did the first fork) returned.
120 print >>sys.stderr, "SERVER_ERROR."
124 print >>sys.stderr, "SERVER_READY."
127 # pipes for process synchronization
131 root = os.path.normpath(self._root_dir)
132 if self._root_dir not in [".", ""] and os.path.exists(root) \
133 and self._clean_root:
135 if not os.path.exists(root):
136 os.makedirs(root, 0755)
144 except OSError, e: # pragma: no cover
145 if e.errno == errno.EINTR:
151 # os.waitpid avoids leaving a <defunc> (zombie) process
152 st = os.waitpid(pid1, 0)[1]
154 raise RuntimeError("Daemonization failed")
155 # return 0 to inform the caller method that this is not the
160 # Decouple from parent environment.
161 os.chdir(self._root_dir)
168 # see ref: "os._exit(0)"
171 # close all open file descriptors.
172 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
173 if (max_fd == resource.RLIM_INFINITY):
175 for fd in range(3, max_fd):
182 # Redirect standard file descriptors.
183 stdin = open(DEV_NULL, "r")
184 stderr = stdout = open(STD_ERR, "a", 0)
185 os.dup2(stdin.fileno(), sys.stdin.fileno())
186 # NOTE: sys.stdout.write will still be buffered, even if the file
187 # was opened with 0 buffer
188 os.dup2(stdout.fileno(), sys.stdout.fileno())
189 os.dup2(stderr.fileno(), sys.stderr.fileno())
192 if self._environment_setup:
193 # parse environment variables and pass to child process
194 # do it by executing shell commands, in case there's some heavy setup involved
195 envproc = subprocess.Popen(
197 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
198 ( self._environment_setup, ) ],
199 stdin = subprocess.PIPE,
200 stdout = subprocess.PIPE,
201 stderr = subprocess.PIPE
203 out,err = envproc.communicate()
205 # parse new environment
207 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
209 # apply to current environment
210 for name, value in environment.iteritems():
211 os.environ[name] = value
214 if 'PYTHONPATH' in environment:
215 sys.path = environment['PYTHONPATH'].split(':') + sys.path
217 # create control socket
218 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
220 self._ctrl_sock.bind(CTRL_SOCK)
222 # Address in use, check pidfile
225 pidfile = open(CTRL_PID, "r")
234 # Check process liveliness
235 if not os.path.exists("/proc/%d" % (pid,)):
236 # Ok, it's dead, clean the socket
240 self._ctrl_sock.bind(CTRL_SOCK)
242 self._ctrl_sock.listen(0)
245 pidfile = open(CTRL_PID, "w")
246 pidfile.write(str(os.getpid()))
249 # let the parent process know that the daemonization is finished
254 def post_daemonize(self):
255 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
256 # QT, for some strange reason, redefines the SIGCHILD handler to write
257 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
258 # Server dameonization closes all file descriptors from fileno '3',
259 # but the overloaded handler (inherited by the forked process) will
260 # keep trying to write the \0 to fileno 'x', which might have been reused
261 # after closing, for other operations. This is bad bad bad when fileno 'x'
262 # is in use for communication pouroses, because unexpected \0 start
263 # appearing in the communication messages... this is exactly what happens
264 # when using netns in daemonized form. Thus, be have no other alternative than
265 # restoring the SIGCHLD handler to the default here.
267 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
270 while not self._stop:
271 conn, addr = self._ctrl_sock.accept()
272 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
274 while not self._stop:
276 msg = self.recv_msg(conn)
277 except socket.timeout, e:
278 #self.log_error("SERVER recv_msg: connection timedout ")
282 self.log_error("CONNECTION LOST")
287 reply = self.stop_action()
289 reply = self.reply_action(msg)
292 self.send_reply(conn, reply)
295 self.log_error("NOTICE: Awaiting for reconnection")
303 def recv_msg(self, conn):
306 while '\n' not in chunk:
308 chunk = conn.recv(1024)
309 except (OSError, socket.error), e:
310 if e[0] != errno.EINTR:
319 data = ''.join(data).split('\n',1)
322 data, self._rdbuf = data
324 decoded = base64.b64decode(data)
325 return decoded.rstrip()
327 def send_reply(self, conn, reply):
328 encoded = base64.b64encode(reply)
329 conn.send("%s\n" % encoded)
333 self._ctrl_sock.close()
338 def stop_action(self):
339 return "Stopping server"
341 def reply_action(self, msg):
342 return "Reply to: %s" % msg
344 def log_error(self, text = None, context = ''):
346 text = traceback.format_exc()
347 date = time.strftime("%Y-%m-%d %H:%M:%S")
349 context = " (%s)" % (context,)
350 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
353 def log_debug(self, text):
354 if self._log_level == DC.DEBUG_LEVEL:
355 date = time.strftime("%Y-%m-%d %H:%M:%S")
356 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
358 class Forwarder(object):
359 def __init__(self, root_dir = "."):
360 self._ctrl_sock = None
361 self._root_dir = root_dir
367 print >>sys.stderr, "FORWARDER_READY."
368 while not self._stop:
369 data = self.read_data()
371 # Connection to client lost
373 self.send_to_server(data)
375 data = self.recv_from_server()
377 # Connection to server lost
378 raise IOError, "Connection to server lost while "\
380 self.write_data(data)
384 return sys.stdin.readline()
386 def write_data(self, data):
387 sys.stdout.write(data)
388 # sys.stdout.write is buffered, this is why we need to do a flush()
391 def send_to_server(self, data):
393 self._ctrl_sock.send(data)
394 except (IOError, socket.error), e:
395 if e[0] == errno.EPIPE:
397 self._ctrl_sock.send(data)
400 encoded = data.rstrip()
401 msg = base64.b64decode(encoded)
405 def recv_from_server(self):
408 while '\n' not in chunk:
410 chunk = self._ctrl_sock.recv(1024)
411 except (OSError, socket.error), e:
412 if e[0] != errno.EINTR:
420 data = ''.join(data).split('\n',1)
423 data, self._rdbuf = data
429 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
430 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
431 self._ctrl_sock.connect(sock_addr)
433 def disconnect(self):
435 self._ctrl_sock.close()
439 class Client(object):
440 def __init__(self, root_dir = ".", host = None, port = None, user = None,
441 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
442 environment_setup = ""):
443 self.root_dir = root_dir
444 self.addr = (host, port)
448 self.communication = communication
449 self.environment_setup = environment_setup
450 self._stopped = False
451 self._deferreds = collections.deque()
455 if self._process.poll() is None:
456 os.kill(self._process.pid, signal.SIGTERM)
460 root_dir = self.root_dir
461 (host, port) = self.addr
465 communication = self.communication
467 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
468 c.forward()" % (root_dir,)
470 self._process = popen_python(python_code,
471 communication = communication,
477 environment_setup = self.environment_setup)
479 # Wait for the forwarder to be ready, otherwise nobody
480 # will be able to connect to it
484 helo = self._process.stderr.readline()
485 if helo == 'FORWARDER_READY.\n':
489 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
491 def send_msg(self, msg):
492 encoded = base64.b64encode(msg)
493 data = "%s\n" % encoded
496 self._process.stdin.write(data)
497 except (IOError, ValueError):
498 # dead process, poll it to un-zombify
501 # try again after reconnect
502 # If it fails again, though, give up
504 self._process.stdin.write(data)
507 self.send_msg(STOP_MSG)
510 def defer_reply(self, transform=None):
512 self._deferreds.append(defer_entry)
514 functools.partial(self.read_reply, defer_entry, transform)
517 def _read_reply(self):
518 data = self._process.stdout.readline()
519 encoded = data.rstrip()
521 # empty == eof == dead process, poll it to un-zombify
524 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
525 return base64.b64decode(encoded)
527 def read_reply(self, which=None, transform=None):
528 # Test to see if someone did it already
529 if which is not None and len(which):
531 # ...just return the deferred value
533 return transform(which[0])
537 # Process all deferreds until the one we're looking for
538 # or until the queue is empty
539 while self._deferreds:
541 deferred = self._deferreds.popleft()
546 deferred.append(self._read_reply())
547 if deferred is which:
548 # We reached the one we were looking for
550 return transform(deferred[0])
555 # They've requested a synchronous read
557 return transform(self._read_reply())
559 return self._read_reply()
561 def _make_server_key_args(server_key, host, port, args):
563 Returns a reference to the created temporary file, and adds the
564 corresponding arguments to the given argument list.
566 Make sure to hold onto it until the process is done with the file
569 host = '%s:%s' % (host,port)
570 # Create a temporary server key file
571 tmp_known_hosts = tempfile.NamedTemporaryFile()
573 # Add the intended host key
574 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
576 # If we're not in strict mode, add user-configured keys
577 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
578 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
579 if os.access(user_hosts_path, os.R_OK):
580 f = open(user_hosts_path, "r")
581 tmp_known_hosts.write(f.read())
584 tmp_known_hosts.flush()
586 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
588 return tmp_known_hosts
590 def make_connkey(user, host, port):
591 connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
592 if len(connkey) > 60:
593 connkey = hashlib.sha1(connkey).hexdigest()
596 def popen_ssh_command(command, host, port, user, agent,
603 err_on_timeout = True,
604 connect_timeout = 30,
608 Executes a remote commands, returns ((stdout,stderr),process)
611 print "ssh", host, command
613 tmp_known_hosts = None
614 connkey = make_connkey(user,host,port)
616 # Don't bother with localhost. Makes test easier
617 '-o', 'NoHostAuthenticationForLocalhost=yes',
618 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
619 '-o', 'ConnectionAttempts=3',
620 '-o', 'ServerAliveInterval=30',
621 '-o', 'TCPKeepAlive=yes',
622 '-l', user, hostip or host]
623 if persistent and openssh_has_persist():
625 '-o', 'ControlMaster=auto',
626 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
627 '-o', 'ControlPersist=60' ])
631 args.append('-p%d' % port)
633 args.extend(('-i', ident_key))
637 # Create a temporary server key file
638 tmp_known_hosts = _make_server_key_args(
639 server_key, host, port, args)
642 for x in xrange(retry or 3):
643 # connects to the remote host and starts a remote connection
644 proc = subprocess.Popen(args,
645 stdout = subprocess.PIPE,
646 stdin = subprocess.PIPE,
647 stderr = subprocess.PIPE)
649 # attach tempfile object to the process, to make sure the file stays
650 # alive until the process is finished with it
651 proc._known_hosts = tmp_known_hosts
654 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
656 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
657 # SSH error, can safely retry
660 # Probably timed out or plain failed but can retry
663 except RuntimeError,e:
667 print " timedout -> ", e.args
671 print " -> ", out, err
673 return ((out, err), proc)
675 def popen_scp(source, dest,
682 Copies from/to remote sites.
684 Source and destination should have the user and host encoded
687 If source is a file object, a special mode will be used to
688 create the remote file with the same contents.
690 If dest is a file object, the remote file (source) will be
691 read and written into dest.
693 In these modes, recursive cannot be True.
695 Source can be a list of files to copy to a single destination,
696 in which case it is advised that the destination be a folder.
700 print "scp", source, dest
702 if isinstance(source, file) and source.tell() == 0:
704 elif hasattr(source, 'read'):
705 tmp = tempfile.NamedTemporaryFile()
707 buf = source.read(65536)
715 if isinstance(source, file) or isinstance(dest, file) \
716 or hasattr(source, 'read') or hasattr(dest, 'write'):
719 # Parse source/destination as <user>@<server>:<path>
720 if isinstance(dest, basestring) and ':' in dest:
721 remspec, path = dest.split(':',1)
722 elif isinstance(source, basestring) and ':' in source:
723 remspec, path = source.split(':',1)
725 raise ValueError, "Both endpoints cannot be local"
726 user,host = remspec.rsplit('@',1)
727 tmp_known_hosts = None
729 connkey = make_connkey(user,host,port)
730 args = ['ssh', '-l', user, '-C',
731 # Don't bother with localhost. Makes test easier
732 '-o', 'NoHostAuthenticationForLocalhost=yes',
733 '-o', 'ConnectTimeout=30',
734 '-o', 'ConnectionAttempts=3',
735 '-o', 'ServerAliveInterval=30',
736 '-o', 'TCPKeepAlive=yes',
738 if openssh_has_persist():
740 '-o', 'ControlMaster=auto',
741 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
742 '-o', 'ControlPersist=60' ])
744 args.append('-P%d' % port)
746 args.extend(('-i', ident_key))
748 # Create a temporary server key file
749 tmp_known_hosts = _make_server_key_args(
750 server_key, host, port, args)
752 if isinstance(source, file) or hasattr(source, 'read'):
753 args.append('cat > %s' % (shell_escape(path),))
754 elif isinstance(dest, file) or hasattr(dest, 'write'):
755 args.append('cat %s' % (shell_escape(path),))
757 raise AssertionError, "Unreachable code reached! :-Q"
759 # connects to the remote host and starts a remote connection
760 if isinstance(source, file):
761 proc = subprocess.Popen(args,
762 stdout = open('/dev/null','w'),
763 stderr = subprocess.PIPE,
765 err = proc.stderr.read()
766 proc._known_hosts = tmp_known_hosts
767 eintr_retry(proc.wait)()
768 return ((None,err), proc)
769 elif isinstance(dest, file):
770 proc = subprocess.Popen(args,
771 stdout = open('/dev/null','w'),
772 stderr = subprocess.PIPE,
774 err = proc.stderr.read()
775 proc._known_hosts = tmp_known_hosts
776 eintr_retry(proc.wait)()
777 return ((None,err), proc)
778 elif hasattr(source, 'read'):
779 # file-like (but not file) source
780 proc = subprocess.Popen(args,
781 stdout = open('/dev/null','w'),
782 stderr = subprocess.PIPE,
783 stdin = subprocess.PIPE)
789 buf = source.read(4096)
794 rdrdy, wrdy, broken = select.select(
797 [proc.stderr,proc.stdin])
799 if proc.stderr in rdrdy:
800 # use os.read for fully unbuffered behavior
801 err.append(os.read(proc.stderr.fileno(), 4096))
803 if proc.stdin in wrdy:
804 proc.stdin.write(buf)
810 err.append(proc.stderr.read())
812 proc._known_hosts = tmp_known_hosts
813 eintr_retry(proc.wait)()
814 return ((None,''.join(err)), proc)
815 elif hasattr(dest, 'write'):
816 # file-like (but not file) dest
817 proc = subprocess.Popen(args,
818 stdout = subprocess.PIPE,
819 stderr = subprocess.PIPE,
820 stdin = open('/dev/null','w'))
825 rdrdy, wrdy, broken = select.select(
826 [proc.stderr, proc.stdout],
828 [proc.stderr, proc.stdout])
830 if proc.stderr in rdrdy:
831 # use os.read for fully unbuffered behavior
832 err.append(os.read(proc.stderr.fileno(), 4096))
834 if proc.stdout in rdrdy:
835 # use os.read for fully unbuffered behavior
836 buf = os.read(proc.stdout.fileno(), 4096)
845 err.append(proc.stderr.read())
847 proc._known_hosts = tmp_known_hosts
848 eintr_retry(proc.wait)()
849 return ((None,''.join(err)), proc)
851 raise AssertionError, "Unreachable code reached! :-Q"
853 # Parse destination as <user>@<server>:<path>
854 if isinstance(dest, basestring) and ':' in dest:
855 remspec, path = dest.split(':',1)
856 elif isinstance(source, basestring) and ':' in source:
857 remspec, path = source.split(':',1)
859 raise ValueError, "Both endpoints cannot be local"
860 user,host = remspec.rsplit('@',1)
863 tmp_known_hosts = None
864 args = ['scp', '-q', '-p', '-C',
865 # Don't bother with localhost. Makes test easier
866 '-o', 'NoHostAuthenticationForLocalhost=yes',
867 '-o', 'ConnectTimeout=30',
868 '-o', 'ConnectionAttempts=3',
869 '-o', 'ServerAliveInterval=30',
870 '-o', 'TCPKeepAlive=yes' ]
873 args.append('-P%d' % port)
877 args.extend(('-i', ident_key))
879 # Create a temporary server key file
880 tmp_known_hosts = _make_server_key_args(
881 server_key, host, port, args)
882 if isinstance(source,list):
885 if openssh_has_persist():
886 connkey = make_connkey(user,host,port)
888 '-o', 'ControlMaster=no',
889 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ) ])
893 # connects to the remote host and starts a remote connection
894 proc = subprocess.Popen(args,
895 stdout = subprocess.PIPE,
896 stdin = subprocess.PIPE,
897 stderr = subprocess.PIPE)
898 proc._known_hosts = tmp_known_hosts
900 comm = proc.communicate()
901 eintr_retry(proc.wait)()
904 def decode_and_execute():
905 # The python code we want to execute might have characters that
906 # are not compatible with the 'inline' mode we are using. To avoid
907 # problems we receive the encoded python code in base64 as a input
908 # stream and decode it for execution.
913 cmd += os.read(0, 1)# one byte from stdin
915 if e.errno == errno.EINTR:
921 cmd = base64.b64decode(cmd)
922 # Uncomment for debug
923 #os.write(2, "Executing python code: %s\n" % cmd)
924 os.write(1, "OK\n") # send a sync message
927 def popen_python(python_code,
928 communication = DC.ACCESS_LOCAL,
938 environment_setup = ""):
942 python_path.replace("'", r"'\''")
943 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
945 if environment_setup:
946 cmd += environment_setup
948 # Uncomment for debug (to run everything under strace)
949 # We had to verify if strace works (cannot nest them)
950 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
952 #cmd += "strace -f -tt -s 200 -o strace$$.out "
954 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
955 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
960 cmd = "sudo bash -c " + shell_escape(cmd)
964 if communication == DC.ACCESS_SSH:
965 tmp_known_hosts = None
967 # Don't bother with localhost. Makes test easier
968 '-o', 'NoHostAuthenticationForLocalhost=yes',
969 '-o', 'ConnectionAttempts=3',
970 '-o', 'ServerAliveInterval=30',
971 '-o', 'TCPKeepAlive=yes',
976 args.append('-p%d' % port)
978 args.extend(('-i', ident_key))
982 # Create a temporary server key file
983 tmp_known_hosts = _make_server_key_args(
984 server_key, host, port, args)
987 args = [ "/bin/bash", "-c", cmd ]
989 # connects to the remote host and starts a remote
990 proc = subprocess.Popen(args,
992 stdout = subprocess.PIPE,
993 stdin = subprocess.PIPE,
994 stderr = subprocess.PIPE)
996 if communication == DC.ACCESS_SSH:
997 proc._known_hosts = tmp_known_hosts
999 # send the command to execute
1000 os.write(proc.stdin.fileno(),
1001 base64.b64encode(python_code) + "\n")
1005 msg = os.read(proc.stdout.fileno(), 3)
1008 if e.errno == errno.EINTR:
1014 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1015 msg, proc.stdout.read(), proc.stderr.read())
1020 def _communicate(self, input, timeout=None, err_on_timeout=True):
1023 stdout = None # Return
1024 stderr = None # Return
1028 if timeout is not None:
1029 timelimit = time.time() + timeout
1030 killtime = timelimit + 4
1031 bailtime = timelimit + 4
1034 # Flush stdio buffer. This might block, if the user has
1035 # been writing to .stdin in an uncontrolled fashion.
1038 write_set.append(self.stdin)
1042 read_set.append(self.stdout)
1045 read_set.append(self.stderr)
1049 while read_set or write_set:
1050 if timeout is not None:
1051 curtime = time.time()
1052 if timeout is None or curtime > timelimit:
1053 if curtime > bailtime:
1055 elif curtime > killtime:
1056 signum = signal.SIGKILL
1058 signum = signal.SIGTERM
1060 os.kill(self.pid, signum)
1061 select_timeout = 0.5
1063 select_timeout = timelimit - curtime + 0.1
1065 select_timeout = 1.0
1067 if select_timeout > 1.0:
1068 select_timeout = 1.0
1071 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1072 except select.error,e:
1078 if not rlist and not wlist and not xlist and self.poll() is not None:
1079 # timeout and process exited, say bye
1082 if self.stdin in wlist:
1083 # When select has indicated that the file is writable,
1084 # we can write up to PIPE_BUF bytes without risk
1085 # blocking. POSIX defines PIPE_BUF >= 512
1086 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1087 input_offset += bytes_written
1088 if input_offset >= len(input):
1090 write_set.remove(self.stdin)
1092 if self.stdout in rlist:
1093 data = os.read(self.stdout.fileno(), 1024)
1096 read_set.remove(self.stdout)
1099 if self.stderr in rlist:
1100 data = os.read(self.stderr.fileno(), 1024)
1103 read_set.remove(self.stderr)
1106 # All data exchanged. Translate lists into strings.
1107 if stdout is not None:
1108 stdout = ''.join(stdout)
1109 if stderr is not None:
1110 stderr = ''.join(stderr)
1112 # Translate newlines, if requested. We cannot let the file
1113 # object do the translation: It is based on stdio, which is
1114 # impossible to combine with select (unless forcing no
1116 if self.universal_newlines and hasattr(file, 'newlines'):
1118 stdout = self._translate_newlines(stdout)
1120 stderr = self._translate_newlines(stderr)
1122 if killed and err_on_timeout:
1123 errcode = self.poll()
1124 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1130 return (stdout, stderr)