1 # -*- coding: utf-8 -*-
3 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
28 STD_ERR = "stderr.log"
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35 OPENSSH_HAS_PERSIST = None
37 if hasattr(os, "devnull"):
40 DEV_NULL = "/dev/null"
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44 hostbyname_cache = dict()
46 def gethostbyname(host):
47 hostbyname = hostbyname_cache.get(host)
49 hostbyname = socket.gethostbyname(host)
50 hostbyname_cache[host] = hostbyname
53 def openssh_has_persist():
54 global OPENSSH_HAS_PERSIST
55 if OPENSSH_HAS_PERSIST is None:
56 proc = subprocess.Popen(["ssh","-v"],
57 stdout = subprocess.PIPE,
58 stderr = subprocess.STDOUT,
59 stdin = open("/dev/null","r") )
60 out,err = proc.communicate()
63 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
64 OPENSSH_HAS_PERSIST = bool(vre.match(out))
65 return OPENSSH_HAS_PERSIST
68 """ Escapes strings so that they are safe to use as command-line arguments """
69 if SHELL_SAFE.match(s):
70 # safe string - no escaping needed
73 # unsafe string - escape
75 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
78 return "'$'\\x%02x''" % (ord(c),)
79 s = ''.join(map(escp,s))
82 def eintr_retry(func):
84 @functools.wraps(func)
86 retry = kw.pop("_retry", False)
87 for i in xrange(0 if retry else 4):
90 except (select.error, socket.error), args:
91 if args[0] == errno.EINTR:
96 if e.errno == errno.EINTR:
101 return func(*p, **kw)
104 class Server(object):
105 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
106 environment_setup = "", clean_root = False):
107 self._root_dir = root_dir
108 self._clean_root = clean_root
110 self._ctrl_sock = None
111 self._log_level = log_level
113 self._environment_setup = environment_setup
118 self.post_daemonize()
122 # can not return normally after fork beacuse no exec was done.
123 # This means that if we don't do a os._exit(0) here the code that
124 # follows the call to "Server.run()" in the "caller code" will be
125 # executed... but by now it has already been executed after the
126 # first process (the one that did the first fork) returned.
129 print >>sys.stderr, "SERVER_ERROR."
133 print >>sys.stderr, "SERVER_READY."
136 # pipes for process synchronization
140 root = os.path.normpath(self._root_dir)
141 if self._root_dir not in [".", ""] and os.path.exists(root) \
142 and self._clean_root:
144 if not os.path.exists(root):
145 os.makedirs(root, 0755)
153 except OSError, e: # pragma: no cover
154 if e.errno == errno.EINTR:
160 # os.waitpid avoids leaving a <defunc> (zombie) process
161 st = os.waitpid(pid1, 0)[1]
163 raise RuntimeError("Daemonization failed")
164 # return 0 to inform the caller method that this is not the
169 # Decouple from parent environment.
170 os.chdir(self._root_dir)
177 # see ref: "os._exit(0)"
180 # close all open file descriptors.
181 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
182 if (max_fd == resource.RLIM_INFINITY):
184 for fd in range(3, max_fd):
191 # Redirect standard file descriptors.
192 stdin = open(DEV_NULL, "r")
193 stderr = stdout = open(STD_ERR, "a", 0)
194 os.dup2(stdin.fileno(), sys.stdin.fileno())
195 # NOTE: sys.stdout.write will still be buffered, even if the file
196 # was opened with 0 buffer
197 os.dup2(stdout.fileno(), sys.stdout.fileno())
198 os.dup2(stderr.fileno(), sys.stderr.fileno())
201 if self._environment_setup:
202 # parse environment variables and pass to child process
203 # do it by executing shell commands, in case there's some heavy setup involved
204 envproc = subprocess.Popen(
206 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
207 ( self._environment_setup, ) ],
208 stdin = subprocess.PIPE,
209 stdout = subprocess.PIPE,
210 stderr = subprocess.PIPE
212 out,err = envproc.communicate()
214 # parse new environment
216 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
218 # apply to current environment
219 for name, value in environment.iteritems():
220 os.environ[name] = value
223 if 'PYTHONPATH' in environment:
224 sys.path = environment['PYTHONPATH'].split(':') + sys.path
226 # create control socket
227 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
229 self._ctrl_sock.bind(CTRL_SOCK)
231 # Address in use, check pidfile
234 pidfile = open(CTRL_PID, "r")
243 # Check process liveliness
244 if not os.path.exists("/proc/%d" % (pid,)):
245 # Ok, it's dead, clean the socket
249 self._ctrl_sock.bind(CTRL_SOCK)
251 self._ctrl_sock.listen(0)
254 pidfile = open(CTRL_PID, "w")
255 pidfile.write(str(os.getpid()))
258 # let the parent process know that the daemonization is finished
263 def post_daemonize(self):
264 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
265 # QT, for some strange reason, redefines the SIGCHILD handler to write
266 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
267 # Server dameonization closes all file descriptors from fileno '3',
268 # but the overloaded handler (inherited by the forked process) will
269 # keep trying to write the \0 to fileno 'x', which might have been reused
270 # after closing, for other operations. This is bad bad bad when fileno 'x'
271 # is in use for communication pouroses, because unexpected \0 start
272 # appearing in the communication messages... this is exactly what happens
273 # when using netns in daemonized form. Thus, be have no other alternative than
274 # restoring the SIGCHLD handler to the default here.
276 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
279 while not self._stop:
280 conn, addr = self._ctrl_sock.accept()
281 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
283 while not self._stop:
285 msg = self.recv_msg(conn)
286 except socket.timeout, e:
287 #self.log_error("SERVER recv_msg: connection timedout ")
291 self.log_error("CONNECTION LOST")
296 reply = self.stop_action()
298 reply = self.reply_action(msg)
301 self.send_reply(conn, reply)
304 self.log_error("NOTICE: Awaiting for reconnection")
312 def recv_msg(self, conn):
315 while '\n' not in chunk:
317 chunk = conn.recv(1024)
318 except (OSError, socket.error), e:
319 if e[0] != errno.EINTR:
328 data = ''.join(data).split('\n',1)
331 data, self._rdbuf = data
333 decoded = base64.b64decode(data)
334 return decoded.rstrip()
336 def send_reply(self, conn, reply):
337 encoded = base64.b64encode(reply)
338 conn.send("%s\n" % encoded)
342 self._ctrl_sock.close()
347 def stop_action(self):
348 return "Stopping server"
350 def reply_action(self, msg):
351 return "Reply to: %s" % msg
353 def log_error(self, text = None, context = ''):
355 text = traceback.format_exc()
356 date = time.strftime("%Y-%m-%d %H:%M:%S")
358 context = " (%s)" % (context,)
359 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
362 def log_debug(self, text):
363 if self._log_level == DC.DEBUG_LEVEL:
364 date = time.strftime("%Y-%m-%d %H:%M:%S")
365 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
367 class Forwarder(object):
368 def __init__(self, root_dir = "."):
369 self._ctrl_sock = None
370 self._root_dir = root_dir
376 print >>sys.stderr, "FORWARDER_READY."
377 while not self._stop:
378 data = self.read_data()
380 # Connection to client lost
382 self.send_to_server(data)
384 data = self.recv_from_server()
386 # Connection to server lost
387 raise IOError, "Connection to server lost while "\
389 self.write_data(data)
393 return sys.stdin.readline()
395 def write_data(self, data):
396 sys.stdout.write(data)
397 # sys.stdout.write is buffered, this is why we need to do a flush()
400 def send_to_server(self, data):
402 self._ctrl_sock.send(data)
403 except (IOError, socket.error), e:
404 if e[0] == errno.EPIPE:
406 self._ctrl_sock.send(data)
409 encoded = data.rstrip()
410 msg = base64.b64decode(encoded)
414 def recv_from_server(self):
417 while '\n' not in chunk:
419 chunk = self._ctrl_sock.recv(1024)
420 except (OSError, socket.error), e:
421 if e[0] != errno.EINTR:
429 data = ''.join(data).split('\n',1)
432 data, self._rdbuf = data
438 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
439 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
440 self._ctrl_sock.connect(sock_addr)
442 def disconnect(self):
444 self._ctrl_sock.close()
448 class Client(object):
449 def __init__(self, root_dir = ".", host = None, port = None, user = None,
450 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
451 environment_setup = ""):
452 self.root_dir = root_dir
453 self.addr = (host, port)
457 self.communication = communication
458 self.environment_setup = environment_setup
459 self._stopped = False
460 self._deferreds = collections.deque()
464 if self._process.poll() is None:
465 os.kill(self._process.pid, signal.SIGTERM)
469 root_dir = self.root_dir
470 (host, port) = self.addr
474 communication = self.communication
476 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
477 c.forward()" % (root_dir,)
479 self._process = popen_python(python_code,
480 communication = communication,
486 environment_setup = self.environment_setup)
488 # Wait for the forwarder to be ready, otherwise nobody
489 # will be able to connect to it
493 helo = self._process.stderr.readline()
494 if helo == 'FORWARDER_READY.\n':
498 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
500 def send_msg(self, msg):
501 encoded = base64.b64encode(msg)
502 data = "%s\n" % encoded
505 self._process.stdin.write(data)
506 except (IOError, ValueError):
507 # dead process, poll it to un-zombify
510 # try again after reconnect
511 # If it fails again, though, give up
513 self._process.stdin.write(data)
516 self.send_msg(STOP_MSG)
519 def defer_reply(self, transform=None):
521 self._deferreds.append(defer_entry)
523 functools.partial(self.read_reply, defer_entry, transform)
526 def _read_reply(self):
527 data = self._process.stdout.readline()
528 encoded = data.rstrip()
530 # empty == eof == dead process, poll it to un-zombify
533 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
534 return base64.b64decode(encoded)
536 def read_reply(self, which=None, transform=None):
537 # Test to see if someone did it already
538 if which is not None and len(which):
540 # ...just return the deferred value
542 return transform(which[0])
546 # Process all deferreds until the one we're looking for
547 # or until the queue is empty
548 while self._deferreds:
550 deferred = self._deferreds.popleft()
555 deferred.append(self._read_reply())
556 if deferred is which:
557 # We reached the one we were looking for
559 return transform(deferred[0])
564 # They've requested a synchronous read
566 return transform(self._read_reply())
568 return self._read_reply()
570 def _make_server_key_args(server_key, host, port, args):
572 Returns a reference to the created temporary file, and adds the
573 corresponding arguments to the given argument list.
575 Make sure to hold onto it until the process is done with the file
578 host = '%s:%s' % (host,port)
579 # Create a temporary server key file
580 tmp_known_hosts = tempfile.NamedTemporaryFile()
582 hostbyname = gethostbyname(host)
584 # Add the intended host key
585 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
587 # If we're not in strict mode, add user-configured keys
588 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
589 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
590 if os.access(user_hosts_path, os.R_OK):
591 f = open(user_hosts_path, "r")
592 tmp_known_hosts.write(f.read())
595 tmp_known_hosts.flush()
597 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
599 return tmp_known_hosts
601 def popen_ssh_command(command, host, port, user, agent,
608 err_on_timeout = True,
609 connect_timeout = 900,
613 Executes a remote commands, returns ((stdout,stderr),process)
616 print "ssh", host, command
618 tmp_known_hosts = None
620 # Don't bother with localhost. Makes test easier
621 '-o', 'NoHostAuthenticationForLocalhost=yes',
622 # XXX: Security vulnerability
623 #'-o', 'StrictHostKeyChecking=no',
624 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
625 '-o', 'ConnectionAttempts=3',
626 '-o', 'ServerAliveInterval=30',
627 '-o', 'TCPKeepAlive=yes',
628 '-l', user, hostip or host]
629 if persistent and openssh_has_persist():
631 '-o', 'ControlMaster=auto',
632 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
633 '-o', 'ControlPersist=60' ])
637 args.append('-p%d' % port)
639 args.extend(('-i', ident_key))
643 # Create a temporary server key file
644 tmp_known_hosts = _make_server_key_args(
645 server_key, host, port, args)
649 for x in xrange(retry or 3):
650 # connects to the remote host and starts a remote connection
651 proc = subprocess.Popen(args,
652 stdout = subprocess.PIPE,
653 stdin = subprocess.PIPE,
654 stderr = subprocess.PIPE)
656 # attach tempfile object to the process, to make sure the file stays
657 # alive until the process is finished with it
658 proc._known_hosts = tmp_known_hosts
661 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
663 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
664 # SSH error, can safely retry
667 # Probably timed out or plain failed but can retry
670 except RuntimeError,e:
674 print " timedout -> ", e.args
678 print " -> ", out, err
680 return ((out, err), proc)
682 def popen_scp(source, dest,
689 Copies from/to remote sites.
691 Source and destination should have the user and host encoded
694 If source is a file object, a special mode will be used to
695 create the remote file with the same contents.
697 If dest is a file object, the remote file (source) will be
698 read and written into dest.
700 In these modes, recursive cannot be True.
702 Source can be a list of files to copy to a single destination,
703 in which case it is advised that the destination be a folder.
707 print "scp", source, dest
709 if isinstance(source, file) and source.tell() == 0:
711 elif hasattr(source, 'read'):
712 tmp = tempfile.NamedTemporaryFile()
714 buf = source.read(65536)
722 if isinstance(source, file) or isinstance(dest, file) \
723 or hasattr(source, 'read') or hasattr(dest, 'write'):
726 # Parse source/destination as <user>@<server>:<path>
727 if isinstance(dest, basestring) and ':' in dest:
728 remspec, path = dest.split(':',1)
729 elif isinstance(source, basestring) and ':' in source:
730 remspec, path = source.split(':',1)
732 raise ValueError, "Both endpoints cannot be local"
733 user,host = remspec.rsplit('@',1)
734 tmp_known_hosts = None
736 args = ['ssh', '-l', user, '-C',
737 # Don't bother with localhost. Makes test easier
738 '-o', 'NoHostAuthenticationForLocalhost=yes',
739 # XXX: Security vulnerability
740 #'-o', 'StrictHostKeyChecking=no',
741 '-o', 'ConnectTimeout=900',
742 '-o', 'ConnectionAttempts=3',
743 '-o', 'ServerAliveInterval=30',
744 '-o', 'TCPKeepAlive=yes',
746 if openssh_has_persist():
748 '-o', 'ControlMaster=auto',
749 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
750 '-o', 'ControlPersist=60' ])
752 args.append('-P%d' % port)
754 args.extend(('-i', ident_key))
756 # Create a temporary server key file
757 tmp_known_hosts = _make_server_key_args(
758 server_key, host, port, args)
760 if isinstance(source, file) or hasattr(source, 'read'):
761 args.append('cat > %s' % (shell_escape(path),))
762 elif isinstance(dest, file) or hasattr(dest, 'write'):
763 args.append('cat %s' % (shell_escape(path),))
765 raise AssertionError, "Unreachable code reached! :-Q"
767 # connects to the remote host and starts a remote connection
768 if isinstance(source, file):
769 proc = subprocess.Popen(args,
770 stdout = open('/dev/null','w'),
771 stderr = subprocess.PIPE,
773 err = proc.stderr.read()
774 proc._known_hosts = tmp_known_hosts
775 eintr_retry(proc.wait)()
776 return ((None,err), proc)
777 elif isinstance(dest, file):
778 proc = subprocess.Popen(args,
779 stdout = open('/dev/null','w'),
780 stderr = subprocess.PIPE,
782 err = proc.stderr.read()
783 proc._known_hosts = tmp_known_hosts
784 eintr_retry(proc.wait)()
785 return ((None,err), proc)
786 elif hasattr(source, 'read'):
787 # file-like (but not file) source
788 proc = subprocess.Popen(args,
789 stdout = open('/dev/null','w'),
790 stderr = subprocess.PIPE,
791 stdin = subprocess.PIPE)
797 buf = source.read(4096)
802 rdrdy, wrdy, broken = select.select(
805 [proc.stderr,proc.stdin])
807 if proc.stderr in rdrdy:
808 # use os.read for fully unbuffered behavior
809 err.append(os.read(proc.stderr.fileno(), 4096))
811 if proc.stdin in wrdy:
812 proc.stdin.write(buf)
818 err.append(proc.stderr.read())
820 proc._known_hosts = tmp_known_hosts
821 eintr_retry(proc.wait)()
822 return ((None,''.join(err)), proc)
823 elif hasattr(dest, 'write'):
824 # file-like (but not file) dest
825 proc = subprocess.Popen(args,
826 stdout = subprocess.PIPE,
827 stderr = subprocess.PIPE,
828 stdin = open('/dev/null','w'))
833 rdrdy, wrdy, broken = select.select(
834 [proc.stderr, proc.stdout],
836 [proc.stderr, proc.stdout])
838 if proc.stderr in rdrdy:
839 # use os.read for fully unbuffered behavior
840 err.append(os.read(proc.stderr.fileno(), 4096))
842 if proc.stdout in rdrdy:
843 # use os.read for fully unbuffered behavior
844 buf = os.read(proc.stdout.fileno(), 4096)
853 err.append(proc.stderr.read())
855 proc._known_hosts = tmp_known_hosts
856 eintr_retry(proc.wait)()
857 return ((None,''.join(err)), proc)
859 raise AssertionError, "Unreachable code reached! :-Q"
861 # Parse destination as <user>@<server>:<path>
862 if isinstance(dest, basestring) and ':' in dest:
863 remspec, path = dest.split(':',1)
864 elif isinstance(source, basestring) and ':' in source:
865 remspec, path = source.split(':',1)
867 raise ValueError, "Both endpoints cannot be local"
868 user,host = remspec.rsplit('@',1)
871 tmp_known_hosts = None
872 args = ['scp', '-q', '-p', '-C',
873 # Don't bother with localhost. Makes test easier
874 '-o', 'NoHostAuthenticationForLocalhost=yes',
875 # XXX: Security vulnerability
876 #'-o', 'StrictHostKeyChecking=no',
877 '-o', 'ConnectTimeout=900',
878 '-o', 'ConnectionAttempts=3',
879 '-o', 'ServerAliveInterval=30',
880 '-o', 'TCPKeepAlive=yes' ]
883 args.append('-P%d' % port)
887 args.extend(('-i', ident_key))
889 # Create a temporary server key file
890 tmp_known_hosts = _make_server_key_args(
891 server_key, host, port, args)
892 if isinstance(source,list):
895 if openssh_has_persist():
897 '-o', 'ControlMaster=auto',
898 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p'])
902 # connects to the remote host and starts a remote connection
903 proc = subprocess.Popen(args,
904 stdout = subprocess.PIPE,
905 stdin = subprocess.PIPE,
906 stderr = subprocess.PIPE)
907 proc._known_hosts = tmp_known_hosts
909 comm = proc.communicate()
910 eintr_retry(proc.wait)()
913 def decode_and_execute():
914 # The python code we want to execute might have characters that
915 # are not compatible with the 'inline' mode we are using. To avoid
916 # problems we receive the encoded python code in base64 as a input
917 # stream and decode it for execution.
922 cmd += os.read(0, 1)# one byte from stdin
924 if e.errno == errno.EINTR:
930 cmd = base64.b64decode(cmd)
931 # Uncomment for debug
932 #os.write(2, "Executing python code: %s\n" % cmd)
933 os.write(1, "OK\n") # send a sync message
936 def popen_python(python_code,
937 communication = DC.ACCESS_LOCAL,
947 environment_setup = ""):
951 python_path.replace("'", r"'\''")
952 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
954 if environment_setup:
955 cmd += environment_setup
957 # Uncomment for debug (to run everything under strace)
958 # We had to verify if strace works (cannot nest them)
959 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
961 #cmd += "strace -f -tt -s 200 -o strace$$.out "
963 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
964 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
969 cmd = "sudo bash -c " + shell_escape(cmd)
973 if communication == DC.ACCESS_SSH:
974 tmp_known_hosts = None
976 # Don't bother with localhost. Makes test easier
977 '-o', 'NoHostAuthenticationForLocalhost=yes',
978 # XXX: Security vulnerability
979 #'-o', 'StrictHostKeyChecking=no',
980 '-o', 'ConnectionAttempts=3',
981 '-o', 'ServerAliveInterval=30',
982 '-o', 'TCPKeepAlive=yes',
987 args.append('-p%d' % port)
989 args.extend(('-i', ident_key))
993 # Create a temporary server key file
994 tmp_known_hosts = _make_server_key_args(
995 server_key, host, port, args)
998 args = [ "/bin/bash", "-c", cmd ]
1000 # connects to the remote host and starts a remote
1001 proc = subprocess.Popen(args,
1003 stdout = subprocess.PIPE,
1004 stdin = subprocess.PIPE,
1005 stderr = subprocess.PIPE)
1007 if communication == DC.ACCESS_SSH:
1008 proc._known_hosts = tmp_known_hosts
1010 # send the command to execute
1011 os.write(proc.stdin.fileno(),
1012 base64.b64encode(python_code) + "\n")
1016 msg = os.read(proc.stdout.fileno(), 3)
1019 if e.errno == errno.EINTR:
1025 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1026 msg, proc.stdout.read(), proc.stderr.read())
1031 def _communicate(self, input, timeout=None, err_on_timeout=True):
1034 stdout = None # Return
1035 stderr = None # Return
1039 if timeout is not None:
1040 timelimit = time.time() + timeout
1041 killtime = timelimit + 4
1042 bailtime = timelimit + 4
1045 # Flush stdio buffer. This might block, if the user has
1046 # been writing to .stdin in an uncontrolled fashion.
1049 write_set.append(self.stdin)
1053 read_set.append(self.stdout)
1056 read_set.append(self.stderr)
1060 while read_set or write_set:
1061 if timeout is not None:
1062 curtime = time.time()
1063 if timeout is None or curtime > timelimit:
1064 if curtime > bailtime:
1066 elif curtime > killtime:
1067 signum = signal.SIGKILL
1069 signum = signal.SIGTERM
1071 os.kill(self.pid, signum)
1072 select_timeout = 0.5
1074 select_timeout = timelimit - curtime + 0.1
1076 select_timeout = 1.0
1078 if select_timeout > 1.0:
1079 select_timeout = 1.0
1082 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1083 except select.error,e:
1089 if not rlist and not wlist and not xlist and self.poll() is not None:
1090 # timeout and process exited, say bye
1093 if self.stdin in wlist:
1094 # When select has indicated that the file is writable,
1095 # we can write up to PIPE_BUF bytes without risk
1096 # blocking. POSIX defines PIPE_BUF >= 512
1097 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1098 input_offset += bytes_written
1099 if input_offset >= len(input):
1101 write_set.remove(self.stdin)
1103 if self.stdout in rlist:
1104 data = os.read(self.stdout.fileno(), 1024)
1107 read_set.remove(self.stdout)
1110 if self.stderr in rlist:
1111 data = os.read(self.stderr.fileno(), 1024)
1114 read_set.remove(self.stderr)
1117 # All data exchanged. Translate lists into strings.
1118 if stdout is not None:
1119 stdout = ''.join(stdout)
1120 if stderr is not None:
1121 stderr = ''.join(stderr)
1123 # Translate newlines, if requested. We cannot let the file
1124 # object do the translation: It is based on stdio, which is
1125 # impossible to combine with select (unless forcing no
1127 if self.universal_newlines and hasattr(file, 'newlines'):
1129 stdout = self._translate_newlines(stdout)
1131 stderr = self._translate_newlines(stderr)
1133 if killed and err_on_timeout:
1134 errcode = self.poll()
1135 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1141 return (stdout, stderr)