1 # -*- coding: utf-8 -*-
3 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
28 STD_ERR = "stderr.log"
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35 OPENSSH_HAS_PERSIST = None
37 if hasattr(os, "devnull"):
40 DEV_NULL = "/dev/null"
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44 hostbyname_cache = dict()
46 def gethostbyname(host):
47 hostbyname = hostbyname_cache.get(host)
49 hostbyname = socket.gethostbyname(host)
50 hostbyname_cache[host] = hostbyname
53 def openssh_has_persist():
54 global OPENSSH_HAS_PERSIST
55 if OPENSSH_HAS_PERSIST is None:
56 proc = subprocess.Popen(["ssh","-v"],
57 stdout = subprocess.PIPE,
58 stderr = subprocess.STDOUT,
59 stdin = open("/dev/null","r") )
60 out,err = proc.communicate()
63 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
64 OPENSSH_HAS_PERSIST = bool(vre.match(out))
65 return OPENSSH_HAS_PERSIST
68 """ Escapes strings so that they are safe to use as command-line arguments """
69 if SHELL_SAFE.match(s):
70 # safe string - no escaping needed
73 # unsafe string - escape
75 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
78 return "'$'\\x%02x''" % (ord(c),)
79 s = ''.join(map(escp,s))
82 def eintr_retry(func):
84 @functools.wraps(func)
86 retry = kw.pop("_retry", False)
87 for i in xrange(0 if retry else 4):
90 except (select.error, socket.error), args:
91 if args[0] == errno.EINTR:
96 if e.errno == errno.EINTR:
101 return func(*p, **kw)
104 class Server(object):
105 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
106 environment_setup = "", clean_root = False):
107 self._root_dir = root_dir
108 self._clean_root = clean_root
110 self._ctrl_sock = None
111 self._log_level = log_level
113 self._environment_setup = environment_setup
118 self.post_daemonize()
122 # can not return normally after fork beacuse no exec was done.
123 # This means that if we don't do a os._exit(0) here the code that
124 # follows the call to "Server.run()" in the "caller code" will be
125 # executed... but by now it has already been executed after the
126 # first process (the one that did the first fork) returned.
129 print >>sys.stderr, "SERVER_ERROR."
133 print >>sys.stderr, "SERVER_READY."
136 # pipes for process synchronization
140 root = os.path.normpath(self._root_dir)
141 if self._root_dir not in [".", ""] and os.path.exists(root) \
142 and self._clean_root:
144 if not os.path.exists(root):
145 os.makedirs(root, 0755)
153 except OSError, e: # pragma: no cover
154 if e.errno == errno.EINTR:
160 # os.waitpid avoids leaving a <defunc> (zombie) process
161 st = os.waitpid(pid1, 0)[1]
163 raise RuntimeError("Daemonization failed")
164 # return 0 to inform the caller method that this is not the
169 # Decouple from parent environment.
170 os.chdir(self._root_dir)
177 # see ref: "os._exit(0)"
180 # close all open file descriptors.
181 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
182 if (max_fd == resource.RLIM_INFINITY):
184 for fd in range(3, max_fd):
191 # Redirect standard file descriptors.
192 stdin = open(DEV_NULL, "r")
193 stderr = stdout = open(STD_ERR, "a", 0)
194 os.dup2(stdin.fileno(), sys.stdin.fileno())
195 # NOTE: sys.stdout.write will still be buffered, even if the file
196 # was opened with 0 buffer
197 os.dup2(stdout.fileno(), sys.stdout.fileno())
198 os.dup2(stderr.fileno(), sys.stderr.fileno())
201 if self._environment_setup:
202 # parse environment variables and pass to child process
203 # do it by executing shell commands, in case there's some heavy setup involved
204 envproc = subprocess.Popen(
206 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
207 ( self._environment_setup, ) ],
208 stdin = subprocess.PIPE,
209 stdout = subprocess.PIPE,
210 stderr = subprocess.PIPE
212 out,err = envproc.communicate()
214 # parse new environment
216 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
218 # apply to current environment
219 for name, value in environment.iteritems():
220 os.environ[name] = value
223 if 'PYTHONPATH' in environment:
224 sys.path = environment['PYTHONPATH'].split(':') + sys.path
226 # create control socket
227 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
229 self._ctrl_sock.bind(CTRL_SOCK)
231 # Address in use, check pidfile
234 pidfile = open(CTRL_PID, "r")
243 # Check process liveliness
244 if not os.path.exists("/proc/%d" % (pid,)):
245 # Ok, it's dead, clean the socket
249 self._ctrl_sock.bind(CTRL_SOCK)
251 self._ctrl_sock.listen(0)
254 pidfile = open(CTRL_PID, "w")
255 pidfile.write(str(os.getpid()))
258 # let the parent process know that the daemonization is finished
263 def post_daemonize(self):
264 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
265 # QT, for some strange reason, redefines the SIGCHILD handler to write
266 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
267 # Server dameonization closes all file descriptors from fileno '3',
268 # but the overloaded handler (inherited by the forked process) will
269 # keep trying to write the \0 to fileno 'x', which might have been reused
270 # after closing, for other operations. This is bad bad bad when fileno 'x'
271 # is in use for communication pouroses, because unexpected \0 start
272 # appearing in the communication messages... this is exactly what happens
273 # when using netns in daemonized form. Thus, be have no other alternative than
274 # restoring the SIGCHLD handler to the default here.
276 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
279 while not self._stop:
280 conn, addr = self._ctrl_sock.accept()
281 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
283 while not self._stop:
285 msg = self.recv_msg(conn)
286 except socket.timeout, e:
287 #self.log_error("SERVER recv_msg: connection timedout ")
291 self.log_error("CONNECTION LOST")
296 reply = self.stop_action()
298 reply = self.reply_action(msg)
301 self.send_reply(conn, reply)
304 self.log_error("NOTICE: Awaiting for reconnection")
312 def recv_msg(self, conn):
315 while '\n' not in chunk:
317 chunk = conn.recv(1024)
318 except (OSError, socket.error), e:
319 if e[0] != errno.EINTR:
328 data = ''.join(data).split('\n',1)
331 data, self._rdbuf = data
333 decoded = base64.b64decode(data)
334 return decoded.rstrip()
336 def send_reply(self, conn, reply):
337 encoded = base64.b64encode(reply)
338 conn.send("%s\n" % encoded)
342 self._ctrl_sock.close()
347 def stop_action(self):
348 return "Stopping server"
350 def reply_action(self, msg):
351 return "Reply to: %s" % msg
353 def log_error(self, text = None, context = ''):
355 text = traceback.format_exc()
356 date = time.strftime("%Y-%m-%d %H:%M:%S")
358 context = " (%s)" % (context,)
359 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
362 def log_debug(self, text):
363 if self._log_level == DC.DEBUG_LEVEL:
364 date = time.strftime("%Y-%m-%d %H:%M:%S")
365 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
367 class Forwarder(object):
368 def __init__(self, root_dir = "."):
369 self._ctrl_sock = None
370 self._root_dir = root_dir
376 print >>sys.stderr, "FORWARDER_READY."
377 while not self._stop:
378 data = self.read_data()
380 # Connection to client lost
382 self.send_to_server(data)
384 data = self.recv_from_server()
386 # Connection to server lost
387 raise IOError, "Connection to server lost while "\
389 self.write_data(data)
393 return sys.stdin.readline()
395 def write_data(self, data):
396 sys.stdout.write(data)
397 # sys.stdout.write is buffered, this is why we need to do a flush()
400 def send_to_server(self, data):
402 self._ctrl_sock.send(data)
403 except (IOError, socket.error), e:
404 if e[0] == errno.EPIPE:
406 self._ctrl_sock.send(data)
409 encoded = data.rstrip()
410 msg = base64.b64decode(encoded)
414 def recv_from_server(self):
417 while '\n' not in chunk:
419 chunk = self._ctrl_sock.recv(1024)
420 except (OSError, socket.error), e:
421 if e[0] != errno.EINTR:
429 data = ''.join(data).split('\n',1)
432 data, self._rdbuf = data
438 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
439 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
440 self._ctrl_sock.connect(sock_addr)
442 def disconnect(self):
444 self._ctrl_sock.close()
448 class Client(object):
449 def __init__(self, root_dir = ".", host = None, port = None, user = None,
450 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
451 environment_setup = ""):
452 self.root_dir = root_dir
453 self.addr = (host, port)
457 self.communication = communication
458 self.environment_setup = environment_setup
459 self._stopped = False
460 self._deferreds = collections.deque()
464 if self._process.poll() is None:
465 os.kill(self._process.pid, signal.SIGTERM)
469 root_dir = self.root_dir
470 (host, port) = self.addr
474 communication = self.communication
476 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
477 c.forward()" % (root_dir,)
479 self._process = popen_python(python_code,
480 communication = communication,
486 environment_setup = self.environment_setup)
488 # Wait for the forwarder to be ready, otherwise nobody
489 # will be able to connect to it
493 helo = self._process.stderr.readline()
494 if helo == 'FORWARDER_READY.\n':
498 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
500 def send_msg(self, msg):
501 encoded = base64.b64encode(msg)
502 data = "%s\n" % encoded
505 self._process.stdin.write(data)
506 except (IOError, ValueError):
507 # dead process, poll it to un-zombify
510 # try again after reconnect
511 # If it fails again, though, give up
513 self._process.stdin.write(data)
516 self.send_msg(STOP_MSG)
519 def defer_reply(self, transform=None):
521 self._deferreds.append(defer_entry)
523 functools.partial(self.read_reply, defer_entry, transform)
526 def _read_reply(self):
527 data = self._process.stdout.readline()
528 encoded = data.rstrip()
530 # empty == eof == dead process, poll it to un-zombify
533 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
534 return base64.b64decode(encoded)
536 def read_reply(self, which=None, transform=None):
537 # Test to see if someone did it already
538 if which is not None and len(which):
540 # ...just return the deferred value
542 return transform(which[0])
546 # Process all deferreds until the one we're looking for
547 # or until the queue is empty
548 while self._deferreds:
550 deferred = self._deferreds.popleft()
555 deferred.append(self._read_reply())
556 if deferred is which:
557 # We reached the one we were looking for
559 return transform(deferred[0])
564 # They've requested a synchronous read
566 return transform(self._read_reply())
568 return self._read_reply()
570 def _make_server_key_args(server_key, host, port, args):
572 Returns a reference to the created temporary file, and adds the
573 corresponding arguments to the given argument list.
575 Make sure to hold onto it until the process is done with the file
578 host = '%s:%s' % (host,port)
579 # Create a temporary server key file
580 tmp_known_hosts = tempfile.NamedTemporaryFile()
582 hostbyname = gethostbyname(host)
584 # Add the intended host key
585 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
587 # If we're not in strict mode, add user-configured keys
588 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
589 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
590 if os.access(user_hosts_path, os.R_OK):
591 f = open(user_hosts_path, "r")
592 tmp_known_hosts.write(f.read())
595 tmp_known_hosts.flush()
597 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
599 return tmp_known_hosts
601 def popen_ssh_command(command, host, port, user, agent,
608 err_on_timeout = True,
609 connect_timeout = 60,
613 Executes a remote commands, returns ((stdout,stderr),process)
616 print "ssh", host, command
618 tmp_known_hosts = None
620 # Don't bother with localhost. Makes test easier
621 '-o', 'NoHostAuthenticationForLocalhost=yes',
622 # XXX: Security vulnerability
623 #'-o', 'StrictHostKeyChecking=no',
624 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
625 '-o', 'ConnectionAttempts=3',
626 '-o', 'ServerAliveInterval=30',
627 '-o', 'TCPKeepAlive=yes',
628 '-l', user, hostip or host]
629 if persistent and openssh_has_persist():
631 '-o', 'ControlMaster=auto',
632 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
633 '-o', 'ControlPersist=60' ])
637 args.append('-p%d' % port)
639 args.extend(('-i', ident_key))
644 # Create a temporary server key file
645 tmp_known_hosts = _make_server_key_args(
646 server_key, host, port, args)
650 for x in xrange(retry or 3):
651 # connects to the remote host and starts a remote connection
652 proc = subprocess.Popen(args,
653 stdout = subprocess.PIPE,
654 stdin = subprocess.PIPE,
655 stderr = subprocess.PIPE)
657 # attach tempfile object to the process, to make sure the file stays
658 # alive until the process is finished with it
659 proc._known_hosts = tmp_known_hosts
662 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
665 print "COMMAND host %s, command %s, error %s" % (host, " ".join(args), err)
666 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
667 # SSH error, can safely retry
670 # Probably timed out or plain failed but can retry
673 except RuntimeError,e:
675 print "COMMAND host %s, command %s, error %s" % (host, " ".join(args), err)
676 print " timedout -> ", e.args
682 print " -> ", out, err
684 return ((out, err), proc)
686 def popen_scp(source, dest,
693 Copies from/to remote sites.
695 Source and destination should have the user and host encoded
698 If source is a file object, a special mode will be used to
699 create the remote file with the same contents.
701 If dest is a file object, the remote file (source) will be
702 read and written into dest.
704 In these modes, recursive cannot be True.
706 Source can be a list of files to copy to a single destination,
707 in which case it is advised that the destination be a folder.
711 print "scp", source, dest
713 if isinstance(source, file) and source.tell() == 0:
715 elif hasattr(source, 'read'):
716 tmp = tempfile.NamedTemporaryFile()
718 buf = source.read(65536)
726 if isinstance(source, file) or isinstance(dest, file) \
727 or hasattr(source, 'read') or hasattr(dest, 'write'):
730 # Parse source/destination as <user>@<server>:<path>
731 if isinstance(dest, basestring) and ':' in dest:
732 remspec, path = dest.split(':',1)
733 elif isinstance(source, basestring) and ':' in source:
734 remspec, path = source.split(':',1)
736 raise ValueError, "Both endpoints cannot be local"
737 user,host = remspec.rsplit('@',1)
738 tmp_known_hosts = None
740 args = ['ssh', '-l', user, '-C',
741 # Don't bother with localhost. Makes test easier
742 '-o', 'NoHostAuthenticationForLocalhost=yes',
743 # XXX: Security vulnerability
744 #'-o', 'StrictHostKeyChecking=no',
745 '-o', 'ConnectTimeout=60',
746 '-o', 'ConnectionAttempts=3',
747 '-o', 'ServerAliveInterval=30',
748 '-o', 'TCPKeepAlive=yes',
750 if openssh_has_persist():
752 '-o', 'ControlMaster=auto',
753 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
754 '-o', 'ControlPersist=60' ])
756 args.append('-P%d' % port)
758 args.extend(('-i', ident_key))
760 # Create a temporary server key file
761 tmp_known_hosts = _make_server_key_args(
762 server_key, host, port, args)
764 if isinstance(source, file) or hasattr(source, 'read'):
765 args.append('cat > %s' % (shell_escape(path),))
766 elif isinstance(dest, file) or hasattr(dest, 'write'):
767 args.append('cat %s' % (shell_escape(path),))
769 raise AssertionError, "Unreachable code reached! :-Q"
771 # connects to the remote host and starts a remote connection
772 if isinstance(source, file):
773 proc = subprocess.Popen(args,
774 stdout = open('/dev/null','w'),
775 stderr = subprocess.PIPE,
777 err = proc.stderr.read()
778 proc._known_hosts = tmp_known_hosts
779 eintr_retry(proc.wait)()
780 return ((None,err), proc)
781 elif isinstance(dest, file):
782 proc = subprocess.Popen(args,
783 stdout = open('/dev/null','w'),
784 stderr = subprocess.PIPE,
786 err = proc.stderr.read()
787 proc._known_hosts = tmp_known_hosts
788 eintr_retry(proc.wait)()
789 return ((None,err), proc)
790 elif hasattr(source, 'read'):
791 # file-like (but not file) source
792 proc = subprocess.Popen(args,
793 stdout = open('/dev/null','w'),
794 stderr = subprocess.PIPE,
795 stdin = subprocess.PIPE)
801 buf = source.read(4096)
806 rdrdy, wrdy, broken = select.select(
809 [proc.stderr,proc.stdin])
811 if proc.stderr in rdrdy:
812 # use os.read for fully unbuffered behavior
813 err.append(os.read(proc.stderr.fileno(), 4096))
815 if proc.stdin in wrdy:
816 proc.stdin.write(buf)
822 err.append(proc.stderr.read())
824 proc._known_hosts = tmp_known_hosts
825 eintr_retry(proc.wait)()
826 return ((None,''.join(err)), proc)
827 elif hasattr(dest, 'write'):
828 # file-like (but not file) dest
829 proc = subprocess.Popen(args,
830 stdout = subprocess.PIPE,
831 stderr = subprocess.PIPE,
832 stdin = open('/dev/null','w'))
837 rdrdy, wrdy, broken = select.select(
838 [proc.stderr, proc.stdout],
840 [proc.stderr, proc.stdout])
842 if proc.stderr in rdrdy:
843 # use os.read for fully unbuffered behavior
844 err.append(os.read(proc.stderr.fileno(), 4096))
846 if proc.stdout in rdrdy:
847 # use os.read for fully unbuffered behavior
848 buf = os.read(proc.stdout.fileno(), 4096)
857 err.append(proc.stderr.read())
859 proc._known_hosts = tmp_known_hosts
860 eintr_retry(proc.wait)()
861 return ((None,''.join(err)), proc)
863 raise AssertionError, "Unreachable code reached! :-Q"
865 # Parse destination as <user>@<server>:<path>
866 if isinstance(dest, basestring) and ':' in dest:
867 remspec, path = dest.split(':',1)
868 elif isinstance(source, basestring) and ':' in source:
869 remspec, path = source.split(':',1)
871 raise ValueError, "Both endpoints cannot be local"
872 user,host = remspec.rsplit('@',1)
875 tmp_known_hosts = None
876 args = ['scp', '-q', '-p', '-C',
877 # Don't bother with localhost. Makes test easier
878 '-o', 'NoHostAuthenticationForLocalhost=yes',
879 # XXX: Security vulnerability
880 #'-o', 'StrictHostKeyChecking=no',
881 '-o', 'ConnectTimeout=60',
882 '-o', 'ConnectionAttempts=3',
883 '-o', 'ServerAliveInterval=30',
884 '-o', 'TCPKeepAlive=yes' ]
887 args.append('-P%d' % port)
891 args.extend(('-i', ident_key))
893 # Create a temporary server key file
894 tmp_known_hosts = _make_server_key_args(
895 server_key, host, port, args)
896 if isinstance(source,list):
899 if openssh_has_persist():
901 '-o', 'ControlMaster=auto',
902 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p'])
906 # connects to the remote host and starts a remote connection
907 proc = subprocess.Popen(args,
908 stdout = subprocess.PIPE,
909 stdin = subprocess.PIPE,
910 stderr = subprocess.PIPE)
911 proc._known_hosts = tmp_known_hosts
913 comm = proc.communicate()
914 eintr_retry(proc.wait)()
917 def decode_and_execute():
918 # The python code we want to execute might have characters that
919 # are not compatible with the 'inline' mode we are using. To avoid
920 # problems we receive the encoded python code in base64 as a input
921 # stream and decode it for execution.
926 cmd += os.read(0, 1)# one byte from stdin
928 if e.errno == errno.EINTR:
934 cmd = base64.b64decode(cmd)
935 # Uncomment for debug
936 #os.write(2, "Executing python code: %s\n" % cmd)
937 os.write(1, "OK\n") # send a sync message
940 def popen_python(python_code,
941 communication = DC.ACCESS_LOCAL,
951 environment_setup = ""):
955 python_path.replace("'", r"'\''")
956 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
958 if environment_setup:
959 cmd += environment_setup
961 # Uncomment for debug (to run everything under strace)
962 # We had to verify if strace works (cannot nest them)
963 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
965 #cmd += "strace -f -tt -s 200 -o strace$$.out "
967 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
968 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
973 cmd = "sudo bash -c " + shell_escape(cmd)
977 if communication == DC.ACCESS_SSH:
978 tmp_known_hosts = None
980 # Don't bother with localhost. Makes test easier
981 '-o', 'NoHostAuthenticationForLocalhost=yes',
982 # XXX: Security vulnerability
983 #'-o', 'StrictHostKeyChecking=no',
984 '-o', 'ConnectionAttempts=3',
985 '-o', 'ServerAliveInterval=30',
986 '-o', 'TCPKeepAlive=yes',
991 args.append('-p%d' % port)
993 args.extend(('-i', ident_key))
997 # Create a temporary server key file
998 tmp_known_hosts = _make_server_key_args(
999 server_key, host, port, args)
1002 args = [ "/bin/bash", "-c", cmd ]
1004 # connects to the remote host and starts a remote
1005 proc = subprocess.Popen(args,
1007 stdout = subprocess.PIPE,
1008 stdin = subprocess.PIPE,
1009 stderr = subprocess.PIPE)
1011 if communication == DC.ACCESS_SSH:
1012 proc._known_hosts = tmp_known_hosts
1014 # send the command to execute
1015 os.write(proc.stdin.fileno(),
1016 base64.b64encode(python_code) + "\n")
1020 msg = os.read(proc.stdout.fileno(), 3)
1023 if e.errno == errno.EINTR:
1029 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1030 msg, proc.stdout.read(), proc.stderr.read())
1035 def _communicate(self, input, timeout=None, err_on_timeout=True):
1038 stdout = None # Return
1039 stderr = None # Return
1043 if timeout is not None:
1044 timelimit = time.time() + timeout
1045 killtime = timelimit + 4
1046 bailtime = timelimit + 4
1049 # Flush stdio buffer. This might block, if the user has
1050 # been writing to .stdin in an uncontrolled fashion.
1053 write_set.append(self.stdin)
1057 read_set.append(self.stdout)
1060 read_set.append(self.stderr)
1064 while read_set or write_set:
1065 if timeout is not None:
1066 curtime = time.time()
1067 if timeout is None or curtime > timelimit:
1068 if curtime > bailtime:
1070 elif curtime > killtime:
1071 signum = signal.SIGKILL
1073 signum = signal.SIGTERM
1075 os.kill(self.pid, signum)
1076 select_timeout = 0.5
1078 select_timeout = timelimit - curtime + 0.1
1080 select_timeout = 1.0
1082 if select_timeout > 1.0:
1083 select_timeout = 1.0
1086 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1087 except select.error,e:
1093 if not rlist and not wlist and not xlist and self.poll() is not None:
1094 # timeout and process exited, say bye
1097 if self.stdin in wlist:
1098 # When select has indicated that the file is writable,
1099 # we can write up to PIPE_BUF bytes without risk
1100 # blocking. POSIX defines PIPE_BUF >= 512
1101 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1102 input_offset += bytes_written
1103 if input_offset >= len(input):
1105 write_set.remove(self.stdin)
1107 if self.stdout in rlist:
1108 data = os.read(self.stdout.fileno(), 1024)
1111 read_set.remove(self.stdout)
1114 if self.stderr in rlist:
1115 data = os.read(self.stderr.fileno(), 1024)
1118 read_set.remove(self.stderr)
1121 # All data exchanged. Translate lists into strings.
1122 if stdout is not None:
1123 stdout = ''.join(stdout)
1124 if stderr is not None:
1125 stderr = ''.join(stderr)
1127 # Translate newlines, if requested. We cannot let the file
1128 # object do the translation: It is based on stdio, which is
1129 # impossible to combine with select (unless forcing no
1131 if self.universal_newlines and hasattr(file, 'newlines'):
1133 stdout = self._translate_newlines(stdout)
1135 stderr = self._translate_newlines(stderr)
1137 if killed and err_on_timeout:
1138 errcode = self.poll()
1139 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1145 return (stdout, stderr)