1 # -*- coding: utf-8 -*-
3 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
28 STD_ERR = "stderr.log"
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35 OPENSSH_HAS_PERSIST = None
37 if hasattr(os, "devnull"):
40 DEV_NULL = "/dev/null"
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44 hostbyname_cache = dict()
46 def openssh_has_persist():
47 global OPENSSH_HAS_PERSIST
48 if OPENSSH_HAS_PERSIST is None:
49 proc = subprocess.Popen(["ssh","-v"],
50 stdout = subprocess.PIPE,
51 stderr = subprocess.STDOUT,
52 stdin = open("/dev/null","r") )
53 out,err = proc.communicate()
56 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
57 OPENSSH_HAS_PERSIST = bool(vre.match(out))
58 return OPENSSH_HAS_PERSIST
61 """ Escapes strings so that they are safe to use as command-line arguments """
62 if SHELL_SAFE.match(s):
63 # safe string - no escaping needed
66 # unsafe string - escape
68 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
71 return "'$'\\x%02x''" % (ord(c),)
72 s = ''.join(map(escp,s))
75 def eintr_retry(func):
77 @functools.wraps(func)
79 retry = kw.pop("_retry", False)
80 for i in xrange(0 if retry else 4):
83 except (select.error, socket.error), args:
84 if args[0] == errno.EINTR:
89 if e.errno == errno.EINTR:
98 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
99 environment_setup = "", clean_root = False):
100 self._root_dir = root_dir
101 self._clean_root = clean_root
103 self._ctrl_sock = None
104 self._log_level = log_level
106 self._environment_setup = environment_setup
111 self.post_daemonize()
115 # can not return normally after fork beacuse no exec was done.
116 # This means that if we don't do a os._exit(0) here the code that
117 # follows the call to "Server.run()" in the "caller code" will be
118 # executed... but by now it has already been executed after the
119 # first process (the one that did the first fork) returned.
122 print >>sys.stderr, "SERVER_ERROR."
126 print >>sys.stderr, "SERVER_READY."
129 # pipes for process synchronization
133 root = os.path.normpath(self._root_dir)
134 if self._root_dir not in [".", ""] and os.path.exists(root) \
135 and self._clean_root:
137 if not os.path.exists(root):
138 os.makedirs(root, 0755)
146 except OSError, e: # pragma: no cover
147 if e.errno == errno.EINTR:
153 # os.waitpid avoids leaving a <defunc> (zombie) process
154 st = os.waitpid(pid1, 0)[1]
156 raise RuntimeError("Daemonization failed")
157 # return 0 to inform the caller method that this is not the
162 # Decouple from parent environment.
163 os.chdir(self._root_dir)
170 # see ref: "os._exit(0)"
173 # close all open file descriptors.
174 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
175 if (max_fd == resource.RLIM_INFINITY):
177 for fd in range(3, max_fd):
184 # Redirect standard file descriptors.
185 stdin = open(DEV_NULL, "r")
186 stderr = stdout = open(STD_ERR, "a", 0)
187 os.dup2(stdin.fileno(), sys.stdin.fileno())
188 # NOTE: sys.stdout.write will still be buffered, even if the file
189 # was opened with 0 buffer
190 os.dup2(stdout.fileno(), sys.stdout.fileno())
191 os.dup2(stderr.fileno(), sys.stderr.fileno())
194 if self._environment_setup:
195 # parse environment variables and pass to child process
196 # do it by executing shell commands, in case there's some heavy setup involved
197 envproc = subprocess.Popen(
199 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
200 ( self._environment_setup, ) ],
201 stdin = subprocess.PIPE,
202 stdout = subprocess.PIPE,
203 stderr = subprocess.PIPE
205 out,err = envproc.communicate()
207 # parse new environment
209 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
211 # apply to current environment
212 for name, value in environment.iteritems():
213 os.environ[name] = value
216 if 'PYTHONPATH' in environment:
217 sys.path = environment['PYTHONPATH'].split(':') + sys.path
219 # create control socket
220 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
222 self._ctrl_sock.bind(CTRL_SOCK)
224 # Address in use, check pidfile
227 pidfile = open(CTRL_PID, "r")
236 # Check process liveliness
237 if not os.path.exists("/proc/%d" % (pid,)):
238 # Ok, it's dead, clean the socket
242 self._ctrl_sock.bind(CTRL_SOCK)
244 self._ctrl_sock.listen(0)
247 pidfile = open(CTRL_PID, "w")
248 pidfile.write(str(os.getpid()))
251 # let the parent process know that the daemonization is finished
256 def post_daemonize(self):
257 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
258 # QT, for some strange reason, redefines the SIGCHILD handler to write
259 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
260 # Server dameonization closes all file descriptors from fileno '3',
261 # but the overloaded handler (inherited by the forked process) will
262 # keep trying to write the \0 to fileno 'x', which might have been reused
263 # after closing, for other operations. This is bad bad bad when fileno 'x'
264 # is in use for communication pouroses, because unexpected \0 start
265 # appearing in the communication messages... this is exactly what happens
266 # when using netns in daemonized form. Thus, be have no other alternative than
267 # restoring the SIGCHLD handler to the default here.
269 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
272 while not self._stop:
273 conn, addr = self._ctrl_sock.accept()
274 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
276 while not self._stop:
278 msg = self.recv_msg(conn)
279 except socket.timeout, e:
280 #self.log_error("SERVER recv_msg: connection timedout ")
284 self.log_error("CONNECTION LOST")
289 reply = self.stop_action()
291 reply = self.reply_action(msg)
294 self.send_reply(conn, reply)
297 self.log_error("NOTICE: Awaiting for reconnection")
305 def recv_msg(self, conn):
308 while '\n' not in chunk:
310 chunk = conn.recv(1024)
311 except (OSError, socket.error), e:
312 if e[0] != errno.EINTR:
321 data = ''.join(data).split('\n',1)
324 data, self._rdbuf = data
326 decoded = base64.b64decode(data)
327 return decoded.rstrip()
329 def send_reply(self, conn, reply):
330 encoded = base64.b64encode(reply)
331 conn.send("%s\n" % encoded)
335 self._ctrl_sock.close()
340 def stop_action(self):
341 return "Stopping server"
343 def reply_action(self, msg):
344 return "Reply to: %s" % msg
346 def log_error(self, text = None, context = ''):
348 text = traceback.format_exc()
349 date = time.strftime("%Y-%m-%d %H:%M:%S")
351 context = " (%s)" % (context,)
352 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
355 def log_debug(self, text):
356 if self._log_level == DC.DEBUG_LEVEL:
357 date = time.strftime("%Y-%m-%d %H:%M:%S")
358 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
360 class Forwarder(object):
361 def __init__(self, root_dir = "."):
362 self._ctrl_sock = None
363 self._root_dir = root_dir
369 print >>sys.stderr, "FORWARDER_READY."
370 while not self._stop:
371 data = self.read_data()
373 # Connection to client lost
375 self.send_to_server(data)
377 data = self.recv_from_server()
379 # Connection to server lost
380 raise IOError, "Connection to server lost while "\
382 self.write_data(data)
386 return sys.stdin.readline()
388 def write_data(self, data):
389 sys.stdout.write(data)
390 # sys.stdout.write is buffered, this is why we need to do a flush()
393 def send_to_server(self, data):
395 self._ctrl_sock.send(data)
396 except (IOError, socket.error), e:
397 if e[0] == errno.EPIPE:
399 self._ctrl_sock.send(data)
402 encoded = data.rstrip()
403 msg = base64.b64decode(encoded)
407 def recv_from_server(self):
410 while '\n' not in chunk:
412 chunk = self._ctrl_sock.recv(1024)
413 except (OSError, socket.error), e:
414 if e[0] != errno.EINTR:
422 data = ''.join(data).split('\n',1)
425 data, self._rdbuf = data
431 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
432 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
433 self._ctrl_sock.connect(sock_addr)
435 def disconnect(self):
437 self._ctrl_sock.close()
441 class Client(object):
442 def __init__(self, root_dir = ".", host = None, port = None, user = None,
443 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
444 environment_setup = ""):
445 self.root_dir = root_dir
446 self.addr = (host, port)
450 self.communication = communication
451 self.environment_setup = environment_setup
452 self._stopped = False
453 self._deferreds = collections.deque()
457 if self._process.poll() is None:
458 os.kill(self._process.pid, signal.SIGTERM)
462 root_dir = self.root_dir
463 (host, port) = self.addr
467 communication = self.communication
469 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
470 c.forward()" % (root_dir,)
472 self._process = popen_python(python_code,
473 communication = communication,
479 environment_setup = self.environment_setup)
481 # Wait for the forwarder to be ready, otherwise nobody
482 # will be able to connect to it
486 helo = self._process.stderr.readline()
487 if helo == 'FORWARDER_READY.\n':
491 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
493 def send_msg(self, msg):
494 encoded = base64.b64encode(msg)
495 data = "%s\n" % encoded
498 self._process.stdin.write(data)
499 except (IOError, ValueError):
500 # dead process, poll it to un-zombify
503 # try again after reconnect
504 # If it fails again, though, give up
506 self._process.stdin.write(data)
509 self.send_msg(STOP_MSG)
512 def defer_reply(self, transform=None):
514 self._deferreds.append(defer_entry)
516 functools.partial(self.read_reply, defer_entry, transform)
519 def _read_reply(self):
520 data = self._process.stdout.readline()
521 encoded = data.rstrip()
523 # empty == eof == dead process, poll it to un-zombify
526 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
527 return base64.b64decode(encoded)
529 def read_reply(self, which=None, transform=None):
530 # Test to see if someone did it already
531 if which is not None and len(which):
533 # ...just return the deferred value
535 return transform(which[0])
539 # Process all deferreds until the one we're looking for
540 # or until the queue is empty
541 while self._deferreds:
543 deferred = self._deferreds.popleft()
548 deferred.append(self._read_reply())
549 if deferred is which:
550 # We reached the one we were looking for
552 return transform(deferred[0])
557 # They've requested a synchronous read
559 return transform(self._read_reply())
561 return self._read_reply()
563 def _make_server_key_args(server_key, host, port, args):
565 Returns a reference to the created temporary file, and adds the
566 corresponding arguments to the given argument list.
568 Make sure to hold onto it until the process is done with the file
571 host = '%s:%s' % (host,port)
572 # Create a temporary server key file
573 tmp_known_hosts = tempfile.NamedTemporaryFile()
575 hostbyname = hostbyname_cache.get(host)
577 hostbyname = socket.gethostbyname(host)
578 hostbyname_cache[host] = hostbyname
580 # Add the intended host key
581 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
583 # If we're not in strict mode, add user-configured keys
584 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
585 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
586 if os.access(user_hosts_path, os.R_OK):
587 f = open(user_hosts_path, "r")
588 tmp_known_hosts.write(f.read())
591 tmp_known_hosts.flush()
593 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
595 return tmp_known_hosts
597 def make_connkey(user, host, port):
598 connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
599 if len(connkey) > 60:
600 connkey = hashlib.sha1(connkey).hexdigest()
603 def popen_ssh_command(command, host, port, user, agent,
610 err_on_timeout = True,
611 connect_timeout = 30,
615 Executes a remote commands, returns ((stdout,stderr),process)
618 print "ssh", host, command
620 tmp_known_hosts = None
621 connkey = make_connkey(user,host,port)
623 # Don't bother with localhost. Makes test easier
624 '-o', 'NoHostAuthenticationForLocalhost=yes',
625 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
626 '-o', 'ConnectionAttempts=3',
627 '-o', 'ServerAliveInterval=30',
628 '-o', 'TCPKeepAlive=yes',
629 '-l', user, hostip or host]
630 if persistent and openssh_has_persist():
632 '-o', 'ControlMaster=auto',
633 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
634 '-o', 'ControlPersist=60' ])
638 args.append('-p%d' % port)
640 args.extend(('-i', ident_key))
644 # Create a temporary server key file
645 tmp_known_hosts = _make_server_key_args(
646 server_key, host, port, args)
649 for x in xrange(retry or 3):
650 # connects to the remote host and starts a remote connection
651 proc = subprocess.Popen(args,
652 stdout = subprocess.PIPE,
653 stdin = subprocess.PIPE,
654 stderr = subprocess.PIPE)
656 # attach tempfile object to the process, to make sure the file stays
657 # alive until the process is finished with it
658 proc._known_hosts = tmp_known_hosts
661 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
663 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
664 # SSH error, can safely retry
667 # Probably timed out or plain failed but can retry
670 except RuntimeError,e:
674 print " timedout -> ", e.args
678 print " -> ", out, err
680 return ((out, err), proc)
682 def popen_scp(source, dest,
689 Copies from/to remote sites.
691 Source and destination should have the user and host encoded
694 If source is a file object, a special mode will be used to
695 create the remote file with the same contents.
697 If dest is a file object, the remote file (source) will be
698 read and written into dest.
700 In these modes, recursive cannot be True.
702 Source can be a list of files to copy to a single destination,
703 in which case it is advised that the destination be a folder.
707 print "scp", source, dest
709 if isinstance(source, file) and source.tell() == 0:
711 elif hasattr(source, 'read'):
712 tmp = tempfile.NamedTemporaryFile()
714 buf = source.read(65536)
722 if isinstance(source, file) or isinstance(dest, file) \
723 or hasattr(source, 'read') or hasattr(dest, 'write'):
726 # Parse source/destination as <user>@<server>:<path>
727 if isinstance(dest, basestring) and ':' in dest:
728 remspec, path = dest.split(':',1)
729 elif isinstance(source, basestring) and ':' in source:
730 remspec, path = source.split(':',1)
732 raise ValueError, "Both endpoints cannot be local"
733 user,host = remspec.rsplit('@',1)
734 tmp_known_hosts = None
736 connkey = make_connkey(user,host,port)
737 args = ['ssh', '-l', user, '-C',
738 # Don't bother with localhost. Makes test easier
739 '-o', 'NoHostAuthenticationForLocalhost=yes',
740 '-o', 'ConnectTimeout=30',
741 '-o', 'ConnectionAttempts=3',
742 '-o', 'ServerAliveInterval=30',
743 '-o', 'TCPKeepAlive=yes',
745 if openssh_has_persist():
747 '-o', 'ControlMaster=auto',
748 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
749 '-o', 'ControlPersist=60' ])
751 args.append('-P%d' % port)
753 args.extend(('-i', ident_key))
755 # Create a temporary server key file
756 tmp_known_hosts = _make_server_key_args(
757 server_key, host, port, args)
759 if isinstance(source, file) or hasattr(source, 'read'):
760 args.append('cat > %s' % (shell_escape(path),))
761 elif isinstance(dest, file) or hasattr(dest, 'write'):
762 args.append('cat %s' % (shell_escape(path),))
764 raise AssertionError, "Unreachable code reached! :-Q"
766 # connects to the remote host and starts a remote connection
767 if isinstance(source, file):
768 proc = subprocess.Popen(args,
769 stdout = open('/dev/null','w'),
770 stderr = subprocess.PIPE,
772 err = proc.stderr.read()
773 proc._known_hosts = tmp_known_hosts
774 eintr_retry(proc.wait)()
775 return ((None,err), proc)
776 elif isinstance(dest, file):
777 proc = subprocess.Popen(args,
778 stdout = open('/dev/null','w'),
779 stderr = subprocess.PIPE,
781 err = proc.stderr.read()
782 proc._known_hosts = tmp_known_hosts
783 eintr_retry(proc.wait)()
784 return ((None,err), proc)
785 elif hasattr(source, 'read'):
786 # file-like (but not file) source
787 proc = subprocess.Popen(args,
788 stdout = open('/dev/null','w'),
789 stderr = subprocess.PIPE,
790 stdin = subprocess.PIPE)
796 buf = source.read(4096)
801 rdrdy, wrdy, broken = select.select(
804 [proc.stderr,proc.stdin])
806 if proc.stderr in rdrdy:
807 # use os.read for fully unbuffered behavior
808 err.append(os.read(proc.stderr.fileno(), 4096))
810 if proc.stdin in wrdy:
811 proc.stdin.write(buf)
817 err.append(proc.stderr.read())
819 proc._known_hosts = tmp_known_hosts
820 eintr_retry(proc.wait)()
821 return ((None,''.join(err)), proc)
822 elif hasattr(dest, 'write'):
823 # file-like (but not file) dest
824 proc = subprocess.Popen(args,
825 stdout = subprocess.PIPE,
826 stderr = subprocess.PIPE,
827 stdin = open('/dev/null','w'))
832 rdrdy, wrdy, broken = select.select(
833 [proc.stderr, proc.stdout],
835 [proc.stderr, proc.stdout])
837 if proc.stderr in rdrdy:
838 # use os.read for fully unbuffered behavior
839 err.append(os.read(proc.stderr.fileno(), 4096))
841 if proc.stdout in rdrdy:
842 # use os.read for fully unbuffered behavior
843 buf = os.read(proc.stdout.fileno(), 4096)
852 err.append(proc.stderr.read())
854 proc._known_hosts = tmp_known_hosts
855 eintr_retry(proc.wait)()
856 return ((None,''.join(err)), proc)
858 raise AssertionError, "Unreachable code reached! :-Q"
860 # Parse destination as <user>@<server>:<path>
861 if isinstance(dest, basestring) and ':' in dest:
862 remspec, path = dest.split(':',1)
863 elif isinstance(source, basestring) and ':' in source:
864 remspec, path = source.split(':',1)
866 raise ValueError, "Both endpoints cannot be local"
867 user,host = remspec.rsplit('@',1)
870 tmp_known_hosts = None
871 args = ['scp', '-q', '-p', '-C',
872 # Don't bother with localhost. Makes test easier
873 '-o', 'NoHostAuthenticationForLocalhost=yes',
874 '-o', 'ConnectTimeout=30',
875 '-o', 'ConnectionAttempts=3',
876 '-o', 'ServerAliveInterval=30',
877 '-o', 'TCPKeepAlive=yes' ]
880 args.append('-P%d' % port)
884 args.extend(('-i', ident_key))
886 # Create a temporary server key file
887 tmp_known_hosts = _make_server_key_args(
888 server_key, host, port, args)
889 if isinstance(source,list):
892 if openssh_has_persist():
893 connkey = make_connkey(user,host,port)
895 '-o', 'ControlMaster=no',
896 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ) ])
900 # connects to the remote host and starts a remote connection
901 proc = subprocess.Popen(args,
902 stdout = subprocess.PIPE,
903 stdin = subprocess.PIPE,
904 stderr = subprocess.PIPE)
905 proc._known_hosts = tmp_known_hosts
907 comm = proc.communicate()
908 eintr_retry(proc.wait)()
911 def decode_and_execute():
912 # The python code we want to execute might have characters that
913 # are not compatible with the 'inline' mode we are using. To avoid
914 # problems we receive the encoded python code in base64 as a input
915 # stream and decode it for execution.
920 cmd += os.read(0, 1)# one byte from stdin
922 if e.errno == errno.EINTR:
928 cmd = base64.b64decode(cmd)
929 # Uncomment for debug
930 #os.write(2, "Executing python code: %s\n" % cmd)
931 os.write(1, "OK\n") # send a sync message
934 def popen_python(python_code,
935 communication = DC.ACCESS_LOCAL,
945 environment_setup = ""):
949 python_path.replace("'", r"'\''")
950 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
952 if environment_setup:
953 cmd += environment_setup
955 # Uncomment for debug (to run everything under strace)
956 # We had to verify if strace works (cannot nest them)
957 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
959 #cmd += "strace -f -tt -s 200 -o strace$$.out "
961 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
962 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
967 cmd = "sudo bash -c " + shell_escape(cmd)
971 if communication == DC.ACCESS_SSH:
972 tmp_known_hosts = None
974 # Don't bother with localhost. Makes test easier
975 '-o', 'NoHostAuthenticationForLocalhost=yes',
976 '-o', 'ConnectionAttempts=3',
977 '-o', 'ServerAliveInterval=30',
978 '-o', 'TCPKeepAlive=yes',
983 args.append('-p%d' % port)
985 args.extend(('-i', ident_key))
989 # Create a temporary server key file
990 tmp_known_hosts = _make_server_key_args(
991 server_key, host, port, args)
994 args = [ "/bin/bash", "-c", cmd ]
996 # connects to the remote host and starts a remote
997 proc = subprocess.Popen(args,
999 stdout = subprocess.PIPE,
1000 stdin = subprocess.PIPE,
1001 stderr = subprocess.PIPE)
1003 if communication == DC.ACCESS_SSH:
1004 proc._known_hosts = tmp_known_hosts
1006 # send the command to execute
1007 os.write(proc.stdin.fileno(),
1008 base64.b64encode(python_code) + "\n")
1012 msg = os.read(proc.stdout.fileno(), 3)
1015 if e.errno == errno.EINTR:
1021 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1022 msg, proc.stdout.read(), proc.stderr.read())
1027 def _communicate(self, input, timeout=None, err_on_timeout=True):
1030 stdout = None # Return
1031 stderr = None # Return
1035 if timeout is not None:
1036 timelimit = time.time() + timeout
1037 killtime = timelimit + 4
1038 bailtime = timelimit + 4
1041 # Flush stdio buffer. This might block, if the user has
1042 # been writing to .stdin in an uncontrolled fashion.
1045 write_set.append(self.stdin)
1049 read_set.append(self.stdout)
1052 read_set.append(self.stderr)
1056 while read_set or write_set:
1057 if timeout is not None:
1058 curtime = time.time()
1059 if timeout is None or curtime > timelimit:
1060 if curtime > bailtime:
1062 elif curtime > killtime:
1063 signum = signal.SIGKILL
1065 signum = signal.SIGTERM
1067 os.kill(self.pid, signum)
1068 select_timeout = 0.5
1070 select_timeout = timelimit - curtime + 0.1
1072 select_timeout = 1.0
1074 if select_timeout > 1.0:
1075 select_timeout = 1.0
1078 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1079 except select.error,e:
1085 if not rlist and not wlist and not xlist and self.poll() is not None:
1086 # timeout and process exited, say bye
1089 if self.stdin in wlist:
1090 # When select has indicated that the file is writable,
1091 # we can write up to PIPE_BUF bytes without risk
1092 # blocking. POSIX defines PIPE_BUF >= 512
1093 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1094 input_offset += bytes_written
1095 if input_offset >= len(input):
1097 write_set.remove(self.stdin)
1099 if self.stdout in rlist:
1100 data = os.read(self.stdout.fileno(), 1024)
1103 read_set.remove(self.stdout)
1106 if self.stderr in rlist:
1107 data = os.read(self.stderr.fileno(), 1024)
1110 read_set.remove(self.stderr)
1113 # All data exchanged. Translate lists into strings.
1114 if stdout is not None:
1115 stdout = ''.join(stdout)
1116 if stderr is not None:
1117 stderr = ''.join(stderr)
1119 # Translate newlines, if requested. We cannot let the file
1120 # object do the translation: It is based on stdio, which is
1121 # impossible to combine with select (unless forcing no
1123 if self.universal_newlines and hasattr(file, 'newlines'):
1125 stdout = self._translate_newlines(stdout)
1127 stderr = self._translate_newlines(stderr)
1129 if killed and err_on_timeout:
1130 errcode = self.poll()
1131 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1137 return (stdout, stderr)