1 # -*- coding: utf-8 -*-
3 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
28 STD_ERR = "stderr.log"
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35 OPENSSH_HAS_PERSIST = None
37 if hasattr(os, "devnull"):
40 DEV_NULL = "/dev/null"
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44 hostbyname_cache = dict()
46 def gethostbyname(host):
47 hostbyname = hostbyname_cache.get(host)
49 hostbyname = socket.gethostbyname(host)
50 hostbyname_cache[host] = hostbyname
53 def openssh_has_persist():
54 global OPENSSH_HAS_PERSIST
55 if OPENSSH_HAS_PERSIST is None:
56 proc = subprocess.Popen(["ssh","-v"],
57 stdout = subprocess.PIPE,
58 stderr = subprocess.STDOUT,
59 stdin = open("/dev/null","r") )
60 out,err = proc.communicate()
63 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
64 OPENSSH_HAS_PERSIST = bool(vre.match(out))
65 return OPENSSH_HAS_PERSIST
68 """ Escapes strings so that they are safe to use as command-line arguments """
69 if SHELL_SAFE.match(s):
70 # safe string - no escaping needed
73 # unsafe string - escape
75 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
78 return "'$'\\x%02x''" % (ord(c),)
79 s = ''.join(map(escp,s))
82 def eintr_retry(func):
84 @functools.wraps(func)
86 retry = kw.pop("_retry", False)
87 for i in xrange(0 if retry else 4):
90 except (select.error, socket.error), args:
91 if args[0] == errno.EINTR:
96 if e.errno == errno.EINTR:
101 return func(*p, **kw)
104 class Server(object):
105 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
106 environment_setup = "", clean_root = False):
107 self._root_dir = root_dir
108 self._clean_root = clean_root
110 self._ctrl_sock = None
111 self._log_level = log_level
113 self._environment_setup = environment_setup
118 self.post_daemonize()
122 # can not return normally after fork beacuse no exec was done.
123 # This means that if we don't do a os._exit(0) here the code that
124 # follows the call to "Server.run()" in the "caller code" will be
125 # executed... but by now it has already been executed after the
126 # first process (the one that did the first fork) returned.
129 print >>sys.stderr, "SERVER_ERROR."
133 print >>sys.stderr, "SERVER_READY."
136 # pipes for process synchronization
140 root = os.path.normpath(self._root_dir)
141 if self._root_dir not in [".", ""] and os.path.exists(root) \
142 and self._clean_root:
144 if not os.path.exists(root):
145 os.makedirs(root, 0755)
153 except OSError, e: # pragma: no cover
154 if e.errno == errno.EINTR:
160 # os.waitpid avoids leaving a <defunc> (zombie) process
161 st = os.waitpid(pid1, 0)[1]
163 raise RuntimeError("Daemonization failed")
164 # return 0 to inform the caller method that this is not the
169 # Decouple from parent environment.
170 os.chdir(self._root_dir)
177 # see ref: "os._exit(0)"
180 # close all open file descriptors.
181 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
182 if (max_fd == resource.RLIM_INFINITY):
184 for fd in range(3, max_fd):
191 # Redirect standard file descriptors.
192 stdin = open(DEV_NULL, "r")
193 stderr = stdout = open(STD_ERR, "a", 0)
194 os.dup2(stdin.fileno(), sys.stdin.fileno())
195 # NOTE: sys.stdout.write will still be buffered, even if the file
196 # was opened with 0 buffer
197 os.dup2(stdout.fileno(), sys.stdout.fileno())
198 os.dup2(stderr.fileno(), sys.stderr.fileno())
201 if self._environment_setup:
202 # parse environment variables and pass to child process
203 # do it by executing shell commands, in case there's some heavy setup involved
204 envproc = subprocess.Popen(
206 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
207 ( self._environment_setup, ) ],
208 stdin = subprocess.PIPE,
209 stdout = subprocess.PIPE,
210 stderr = subprocess.PIPE
212 out,err = envproc.communicate()
214 # parse new environment
216 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
218 # apply to current environment
219 for name, value in environment.iteritems():
220 os.environ[name] = value
223 if 'PYTHONPATH' in environment:
224 sys.path = environment['PYTHONPATH'].split(':') + sys.path
226 # create control socket
227 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
229 self._ctrl_sock.bind(CTRL_SOCK)
231 # Address in use, check pidfile
234 pidfile = open(CTRL_PID, "r")
243 # Check process liveliness
244 if not os.path.exists("/proc/%d" % (pid,)):
245 # Ok, it's dead, clean the socket
249 self._ctrl_sock.bind(CTRL_SOCK)
251 self._ctrl_sock.listen(0)
254 pidfile = open(CTRL_PID, "w")
255 pidfile.write(str(os.getpid()))
258 # let the parent process know that the daemonization is finished
263 def post_daemonize(self):
264 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
265 # QT, for some strange reason, redefines the SIGCHILD handler to write
266 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
267 # Server dameonization closes all file descriptors from fileno '3',
268 # but the overloaded handler (inherited by the forked process) will
269 # keep trying to write the \0 to fileno 'x', which might have been reused
270 # after closing, for other operations. This is bad bad bad when fileno 'x'
271 # is in use for communication pouroses, because unexpected \0 start
272 # appearing in the communication messages... this is exactly what happens
273 # when using netns in daemonized form. Thus, be have no other alternative than
274 # restoring the SIGCHLD handler to the default here.
276 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
279 while not self._stop:
280 conn, addr = self._ctrl_sock.accept()
281 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
283 while not self._stop:
285 msg = self.recv_msg(conn)
286 except socket.timeout, e:
287 #self.log_error("SERVER recv_msg: connection timedout ")
291 self.log_error("CONNECTION LOST")
296 reply = self.stop_action()
298 reply = self.reply_action(msg)
301 self.send_reply(conn, reply)
304 self.log_error("NOTICE: Awaiting for reconnection")
312 def recv_msg(self, conn):
315 while '\n' not in chunk:
317 chunk = conn.recv(1024)
318 except (OSError, socket.error), e:
319 if e[0] != errno.EINTR:
328 data = ''.join(data).split('\n',1)
331 data, self._rdbuf = data
333 decoded = base64.b64decode(data)
334 return decoded.rstrip()
336 def send_reply(self, conn, reply):
337 encoded = base64.b64encode(reply)
338 conn.send("%s\n" % encoded)
342 self._ctrl_sock.close()
347 def stop_action(self):
348 return "Stopping server"
350 def reply_action(self, msg):
351 return "Reply to: %s" % msg
353 def log_error(self, text = None, context = ''):
355 text = traceback.format_exc()
356 date = time.strftime("%Y-%m-%d %H:%M:%S")
358 context = " (%s)" % (context,)
359 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
362 def log_debug(self, text):
363 if self._log_level == DC.DEBUG_LEVEL:
364 date = time.strftime("%Y-%m-%d %H:%M:%S")
365 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
367 class Forwarder(object):
368 def __init__(self, root_dir = "."):
369 self._ctrl_sock = None
370 self._root_dir = root_dir
376 print >>sys.stderr, "FORWARDER_READY."
377 while not self._stop:
378 data = self.read_data()
380 # Connection to client lost
382 self.send_to_server(data)
384 data = self.recv_from_server()
386 # Connection to server lost
387 raise IOError, "Connection to server lost while "\
389 self.write_data(data)
393 return sys.stdin.readline()
395 def write_data(self, data):
396 sys.stdout.write(data)
397 # sys.stdout.write is buffered, this is why we need to do a flush()
400 def send_to_server(self, data):
402 self._ctrl_sock.send(data)
403 except (IOError, socket.error), e:
404 if e[0] == errno.EPIPE:
406 self._ctrl_sock.send(data)
409 encoded = data.rstrip()
410 msg = base64.b64decode(encoded)
414 def recv_from_server(self):
417 while '\n' not in chunk:
419 chunk = self._ctrl_sock.recv(1024)
420 except (OSError, socket.error), e:
421 if e[0] != errno.EINTR:
429 data = ''.join(data).split('\n',1)
432 data, self._rdbuf = data
438 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
439 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
440 self._ctrl_sock.connect(sock_addr)
442 def disconnect(self):
444 self._ctrl_sock.close()
448 class Client(object):
449 def __init__(self, root_dir = ".", host = None, port = None, user = None,
450 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
451 environment_setup = ""):
452 self.root_dir = root_dir
453 self.addr = (host, port)
457 self.communication = communication
458 self.environment_setup = environment_setup
459 self._stopped = False
460 self._deferreds = collections.deque()
464 if self._process.poll() is None:
465 os.kill(self._process.pid, signal.SIGTERM)
469 root_dir = self.root_dir
470 (host, port) = self.addr
474 communication = self.communication
476 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
477 c.forward()" % (root_dir,)
479 self._process = popen_python(python_code,
480 communication = communication,
486 environment_setup = self.environment_setup)
488 # Wait for the forwarder to be ready, otherwise nobody
489 # will be able to connect to it
493 helo = self._process.stderr.readline()
494 if helo == 'FORWARDER_READY.\n':
498 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
500 def send_msg(self, msg):
501 encoded = base64.b64encode(msg)
502 data = "%s\n" % encoded
505 self._process.stdin.write(data)
506 except (IOError, ValueError):
507 # dead process, poll it to un-zombify
510 # try again after reconnect
511 # If it fails again, though, give up
513 self._process.stdin.write(data)
516 self.send_msg(STOP_MSG)
519 def defer_reply(self, transform=None):
521 self._deferreds.append(defer_entry)
523 functools.partial(self.read_reply, defer_entry, transform)
526 def _read_reply(self):
527 data = self._process.stdout.readline()
528 encoded = data.rstrip()
530 # empty == eof == dead process, poll it to un-zombify
533 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
534 return base64.b64decode(encoded)
536 def read_reply(self, which=None, transform=None):
537 # Test to see if someone did it already
538 if which is not None and len(which):
540 # ...just return the deferred value
542 return transform(which[0])
546 # Process all deferreds until the one we're looking for
547 # or until the queue is empty
548 while self._deferreds:
550 deferred = self._deferreds.popleft()
555 deferred.append(self._read_reply())
556 if deferred is which:
557 # We reached the one we were looking for
559 return transform(deferred[0])
564 # They've requested a synchronous read
566 return transform(self._read_reply())
568 return self._read_reply()
570 def _make_server_key_args(server_key, host, port, args):
572 Returns a reference to the created temporary file, and adds the
573 corresponding arguments to the given argument list.
575 Make sure to hold onto it until the process is done with the file
578 host = '%s:%s' % (host,port)
579 # Create a temporary server key file
580 tmp_known_hosts = tempfile.NamedTemporaryFile()
582 hostbyname = gethostbyname(host)
584 # Add the intended host key
585 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
587 # If we're not in strict mode, add user-configured keys
588 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
589 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
590 if os.access(user_hosts_path, os.R_OK):
591 f = open(user_hosts_path, "r")
592 tmp_known_hosts.write(f.read())
595 tmp_known_hosts.flush()
597 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
599 return tmp_known_hosts
601 def popen_ssh_command(command, host, port, user, agent,
608 err_on_timeout = True,
609 connect_timeout = 900,
613 Executes a remote commands, returns ((stdout,stderr),process)
616 print "ssh", host, command
618 tmp_known_hosts = None
620 # Don't bother with localhost. Makes test easier
621 '-o', 'NoHostAuthenticationForLocalhost=yes',
622 # XXX: Security vulnerability
623 #'-o', 'StrictHostKeyChecking=no',
624 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
625 '-o', 'ConnectionAttempts=3',
626 '-o', 'ServerAliveInterval=30',
627 '-o', 'TCPKeepAlive=yes',
628 '-l', user, hostip or host]
629 if persistent and openssh_has_persist():
631 '-o', 'ControlMaster=auto',
632 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
633 '-o', 'ControlPersist=60' ])
637 args.append('-p%d' % port)
639 args.extend(('-i', ident_key))
643 # Create a temporary server key file
644 tmp_known_hosts = _make_server_key_args(
645 server_key, host, port, args)
648 for x in xrange(retry or 3):
649 # connects to the remote host and starts a remote connection
650 proc = subprocess.Popen(args,
651 stdout = subprocess.PIPE,
652 stdin = subprocess.PIPE,
653 stderr = subprocess.PIPE)
655 # attach tempfile object to the process, to make sure the file stays
656 # alive until the process is finished with it
657 proc._known_hosts = tmp_known_hosts
660 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
662 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
663 # SSH error, can safely retry
666 # Probably timed out or plain failed but can retry
669 except RuntimeError,e:
673 print " timedout -> ", e.args
677 print " -> ", out, err
679 return ((out, err), proc)
681 def popen_scp(source, dest,
688 Copies from/to remote sites.
690 Source and destination should have the user and host encoded
693 If source is a file object, a special mode will be used to
694 create the remote file with the same contents.
696 If dest is a file object, the remote file (source) will be
697 read and written into dest.
699 In these modes, recursive cannot be True.
701 Source can be a list of files to copy to a single destination,
702 in which case it is advised that the destination be a folder.
706 print "scp", source, dest
708 if isinstance(source, file) and source.tell() == 0:
710 elif hasattr(source, 'read'):
711 tmp = tempfile.NamedTemporaryFile()
713 buf = source.read(65536)
721 if isinstance(source, file) or isinstance(dest, file) \
722 or hasattr(source, 'read') or hasattr(dest, 'write'):
725 # Parse source/destination as <user>@<server>:<path>
726 if isinstance(dest, basestring) and ':' in dest:
727 remspec, path = dest.split(':',1)
728 elif isinstance(source, basestring) and ':' in source:
729 remspec, path = source.split(':',1)
731 raise ValueError, "Both endpoints cannot be local"
732 user,host = remspec.rsplit('@',1)
733 tmp_known_hosts = None
735 args = ['ssh', '-l', user, '-C',
736 # Don't bother with localhost. Makes test easier
737 '-o', 'NoHostAuthenticationForLocalhost=yes',
738 # XXX: Security vulnerability
739 #'-o', 'StrictHostKeyChecking=no',
740 '-o', 'ConnectTimeout=900',
741 '-o', 'ConnectionAttempts=3',
742 '-o', 'ServerAliveInterval=30',
743 '-o', 'TCPKeepAlive=yes',
745 if openssh_has_persist():
747 '-o', 'ControlMaster=auto',
748 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
749 '-o', 'ControlPersist=60' ])
751 args.append('-P%d' % port)
753 args.extend(('-i', ident_key))
755 # Create a temporary server key file
756 tmp_known_hosts = _make_server_key_args(
757 server_key, host, port, args)
759 if isinstance(source, file) or hasattr(source, 'read'):
760 args.append('cat > %s' % (shell_escape(path),))
761 elif isinstance(dest, file) or hasattr(dest, 'write'):
762 args.append('cat %s' % (shell_escape(path),))
764 raise AssertionError, "Unreachable code reached! :-Q"
766 # connects to the remote host and starts a remote connection
767 if isinstance(source, file):
768 proc = subprocess.Popen(args,
769 stdout = open('/dev/null','w'),
770 stderr = subprocess.PIPE,
772 err = proc.stderr.read()
773 proc._known_hosts = tmp_known_hosts
774 eintr_retry(proc.wait)()
775 return ((None,err), proc)
776 elif isinstance(dest, file):
777 proc = subprocess.Popen(args,
778 stdout = open('/dev/null','w'),
779 stderr = subprocess.PIPE,
781 err = proc.stderr.read()
782 proc._known_hosts = tmp_known_hosts
783 eintr_retry(proc.wait)()
784 return ((None,err), proc)
785 elif hasattr(source, 'read'):
786 # file-like (but not file) source
787 proc = subprocess.Popen(args,
788 stdout = open('/dev/null','w'),
789 stderr = subprocess.PIPE,
790 stdin = subprocess.PIPE)
796 buf = source.read(4096)
801 rdrdy, wrdy, broken = select.select(
804 [proc.stderr,proc.stdin])
806 if proc.stderr in rdrdy:
807 # use os.read for fully unbuffered behavior
808 err.append(os.read(proc.stderr.fileno(), 4096))
810 if proc.stdin in wrdy:
811 proc.stdin.write(buf)
817 err.append(proc.stderr.read())
819 proc._known_hosts = tmp_known_hosts
820 eintr_retry(proc.wait)()
821 return ((None,''.join(err)), proc)
822 elif hasattr(dest, 'write'):
823 # file-like (but not file) dest
824 proc = subprocess.Popen(args,
825 stdout = subprocess.PIPE,
826 stderr = subprocess.PIPE,
827 stdin = open('/dev/null','w'))
832 rdrdy, wrdy, broken = select.select(
833 [proc.stderr, proc.stdout],
835 [proc.stderr, proc.stdout])
837 if proc.stderr in rdrdy:
838 # use os.read for fully unbuffered behavior
839 err.append(os.read(proc.stderr.fileno(), 4096))
841 if proc.stdout in rdrdy:
842 # use os.read for fully unbuffered behavior
843 buf = os.read(proc.stdout.fileno(), 4096)
852 err.append(proc.stderr.read())
854 proc._known_hosts = tmp_known_hosts
855 eintr_retry(proc.wait)()
856 return ((None,''.join(err)), proc)
858 raise AssertionError, "Unreachable code reached! :-Q"
860 # Parse destination as <user>@<server>:<path>
861 if isinstance(dest, basestring) and ':' in dest:
862 remspec, path = dest.split(':',1)
863 elif isinstance(source, basestring) and ':' in source:
864 remspec, path = source.split(':',1)
866 raise ValueError, "Both endpoints cannot be local"
867 user,host = remspec.rsplit('@',1)
870 tmp_known_hosts = None
871 args = ['scp', '-q', '-p', '-C',
872 # Don't bother with localhost. Makes test easier
873 '-o', 'NoHostAuthenticationForLocalhost=yes',
874 # XXX: Security vulnerability
875 #'-o', 'StrictHostKeyChecking=no',
876 '-o', 'ConnectTimeout=900',
877 '-o', 'ConnectionAttempts=3',
878 '-o', 'ServerAliveInterval=30',
879 '-o', 'TCPKeepAlive=yes' ]
882 args.append('-P%d' % port)
886 args.extend(('-i', ident_key))
888 # Create a temporary server key file
889 tmp_known_hosts = _make_server_key_args(
890 server_key, host, port, args)
891 if isinstance(source,list):
894 if openssh_has_persist():
896 '-o', 'ControlMaster=auto',
897 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p'])
901 # connects to the remote host and starts a remote connection
902 proc = subprocess.Popen(args,
903 stdout = subprocess.PIPE,
904 stdin = subprocess.PIPE,
905 stderr = subprocess.PIPE)
906 proc._known_hosts = tmp_known_hosts
908 comm = proc.communicate()
909 eintr_retry(proc.wait)()
912 def decode_and_execute():
913 # The python code we want to execute might have characters that
914 # are not compatible with the 'inline' mode we are using. To avoid
915 # problems we receive the encoded python code in base64 as a input
916 # stream and decode it for execution.
921 cmd += os.read(0, 1)# one byte from stdin
923 if e.errno == errno.EINTR:
929 cmd = base64.b64decode(cmd)
930 # Uncomment for debug
931 #os.write(2, "Executing python code: %s\n" % cmd)
932 os.write(1, "OK\n") # send a sync message
935 def popen_python(python_code,
936 communication = DC.ACCESS_LOCAL,
946 environment_setup = ""):
950 python_path.replace("'", r"'\''")
951 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
953 if environment_setup:
954 cmd += environment_setup
956 # Uncomment for debug (to run everything under strace)
957 # We had to verify if strace works (cannot nest them)
958 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
960 #cmd += "strace -f -tt -s 200 -o strace$$.out "
962 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
963 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
968 cmd = "sudo bash -c " + shell_escape(cmd)
972 if communication == DC.ACCESS_SSH:
973 tmp_known_hosts = None
975 # Don't bother with localhost. Makes test easier
976 '-o', 'NoHostAuthenticationForLocalhost=yes',
977 # XXX: Security vulnerability
978 #'-o', 'StrictHostKeyChecking=no',
979 '-o', 'ConnectionAttempts=3',
980 '-o', 'ServerAliveInterval=30',
981 '-o', 'TCPKeepAlive=yes',
986 args.append('-p%d' % port)
988 args.extend(('-i', ident_key))
992 # Create a temporary server key file
993 tmp_known_hosts = _make_server_key_args(
994 server_key, host, port, args)
997 args = [ "/bin/bash", "-c", cmd ]
999 # connects to the remote host and starts a remote
1000 proc = subprocess.Popen(args,
1002 stdout = subprocess.PIPE,
1003 stdin = subprocess.PIPE,
1004 stderr = subprocess.PIPE)
1006 if communication == DC.ACCESS_SSH:
1007 proc._known_hosts = tmp_known_hosts
1009 # send the command to execute
1010 os.write(proc.stdin.fileno(),
1011 base64.b64encode(python_code) + "\n")
1015 msg = os.read(proc.stdout.fileno(), 3)
1018 if e.errno == errno.EINTR:
1024 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1025 msg, proc.stdout.read(), proc.stderr.read())
1030 def _communicate(self, input, timeout=None, err_on_timeout=True):
1033 stdout = None # Return
1034 stderr = None # Return
1038 if timeout is not None:
1039 timelimit = time.time() + timeout
1040 killtime = timelimit + 4
1041 bailtime = timelimit + 4
1044 # Flush stdio buffer. This might block, if the user has
1045 # been writing to .stdin in an uncontrolled fashion.
1048 write_set.append(self.stdin)
1052 read_set.append(self.stdout)
1055 read_set.append(self.stderr)
1059 while read_set or write_set:
1060 if timeout is not None:
1061 curtime = time.time()
1062 if timeout is None or curtime > timelimit:
1063 if curtime > bailtime:
1065 elif curtime > killtime:
1066 signum = signal.SIGKILL
1068 signum = signal.SIGTERM
1070 os.kill(self.pid, signum)
1071 select_timeout = 0.5
1073 select_timeout = timelimit - curtime + 0.1
1075 select_timeout = 1.0
1077 if select_timeout > 1.0:
1078 select_timeout = 1.0
1081 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1082 except select.error,e:
1088 if not rlist and not wlist and not xlist and self.poll() is not None:
1089 # timeout and process exited, say bye
1092 if self.stdin in wlist:
1093 # When select has indicated that the file is writable,
1094 # we can write up to PIPE_BUF bytes without risk
1095 # blocking. POSIX defines PIPE_BUF >= 512
1096 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1097 input_offset += bytes_written
1098 if input_offset >= len(input):
1100 write_set.remove(self.stdin)
1102 if self.stdout in rlist:
1103 data = os.read(self.stdout.fileno(), 1024)
1106 read_set.remove(self.stdout)
1109 if self.stderr in rlist:
1110 data = os.read(self.stderr.fileno(), 1024)
1113 read_set.remove(self.stderr)
1116 # All data exchanged. Translate lists into strings.
1117 if stdout is not None:
1118 stdout = ''.join(stdout)
1119 if stderr is not None:
1120 stderr = ''.join(stderr)
1122 # Translate newlines, if requested. We cannot let the file
1123 # object do the translation: It is based on stdio, which is
1124 # impossible to combine with select (unless forcing no
1126 if self.universal_newlines and hasattr(file, 'newlines'):
1128 stdout = self._translate_newlines(stdout)
1130 stderr = self._translate_newlines(stderr)
1132 if killed and err_on_timeout:
1133 errcode = self.poll()
1134 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1140 return (stdout, stderr)