1 # -*- coding: utf-8 -*-
3 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
28 STD_ERR = "stderr.log"
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35 OPENSSH_HAS_PERSIST = None
37 if hasattr(os, "devnull"):
40 DEV_NULL = "/dev/null"
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44 hostbyname_cache = dict()
46 def gethostbyname(host):
47 hostbyname = hostbyname_cache.get(host)
49 hostbyname = socket.gethostbyname(host)
50 hostbyname_cache[host] = hostbyname
53 def openssh_has_persist():
54 global OPENSSH_HAS_PERSIST
55 if OPENSSH_HAS_PERSIST is None:
56 proc = subprocess.Popen(["ssh","-v"],
57 stdout = subprocess.PIPE,
58 stderr = subprocess.STDOUT,
59 stdin = open("/dev/null","r") )
60 out,err = proc.communicate()
63 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
64 OPENSSH_HAS_PERSIST = bool(vre.match(out))
65 return OPENSSH_HAS_PERSIST
68 """ Escapes strings so that they are safe to use as command-line arguments """
69 if SHELL_SAFE.match(s):
70 # safe string - no escaping needed
73 # unsafe string - escape
75 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
78 return "'$'\\x%02x''" % (ord(c),)
79 s = ''.join(map(escp,s))
82 def eintr_retry(func):
84 @functools.wraps(func)
86 retry = kw.pop("_retry", False)
87 for i in xrange(0 if retry else 4):
90 except (select.error, socket.error), args:
91 if args[0] == errno.EINTR:
96 if e.errno == errno.EINTR:
101 return func(*p, **kw)
104 class Server(object):
105 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
106 environment_setup = "", clean_root = False):
107 self._root_dir = root_dir
108 self._clean_root = clean_root
110 self._ctrl_sock = None
111 self._log_level = log_level
113 self._environment_setup = environment_setup
118 self.post_daemonize()
122 # can not return normally after fork beacuse no exec was done.
123 # This means that if we don't do a os._exit(0) here the code that
124 # follows the call to "Server.run()" in the "caller code" will be
125 # executed... but by now it has already been executed after the
126 # first process (the one that did the first fork) returned.
129 print >>sys.stderr, "SERVER_ERROR."
133 print >>sys.stderr, "SERVER_READY."
136 # pipes for process synchronization
140 root = os.path.normpath(self._root_dir)
141 if self._root_dir not in [".", ""] and os.path.exists(root) \
142 and self._clean_root:
144 if not os.path.exists(root):
145 os.makedirs(root, 0755)
153 except OSError, e: # pragma: no cover
154 if e.errno == errno.EINTR:
160 # os.waitpid avoids leaving a <defunc> (zombie) process
161 st = os.waitpid(pid1, 0)[1]
163 raise RuntimeError("Daemonization failed")
164 # return 0 to inform the caller method that this is not the
169 # Decouple from parent environment.
170 os.chdir(self._root_dir)
177 # see ref: "os._exit(0)"
180 # close all open file descriptors.
181 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
182 if (max_fd == resource.RLIM_INFINITY):
184 for fd in range(3, max_fd):
191 # Redirect standard file descriptors.
192 stdin = open(DEV_NULL, "r")
193 stderr = stdout = open(STD_ERR, "a", 0)
194 os.dup2(stdin.fileno(), sys.stdin.fileno())
195 # NOTE: sys.stdout.write will still be buffered, even if the file
196 # was opened with 0 buffer
197 os.dup2(stdout.fileno(), sys.stdout.fileno())
198 os.dup2(stderr.fileno(), sys.stderr.fileno())
201 if self._environment_setup:
202 # parse environment variables and pass to child process
203 # do it by executing shell commands, in case there's some heavy setup involved
204 envproc = subprocess.Popen(
206 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
207 ( self._environment_setup, ) ],
208 stdin = subprocess.PIPE,
209 stdout = subprocess.PIPE,
210 stderr = subprocess.PIPE
212 out,err = envproc.communicate()
214 # parse new environment
216 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
218 # apply to current environment
219 for name, value in environment.iteritems():
220 os.environ[name] = value
223 if 'PYTHONPATH' in environment:
224 sys.path = environment['PYTHONPATH'].split(':') + sys.path
226 # create control socket
227 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
229 self._ctrl_sock.bind(CTRL_SOCK)
231 # Address in use, check pidfile
234 pidfile = open(CTRL_PID, "r")
243 # Check process liveliness
244 if not os.path.exists("/proc/%d" % (pid,)):
245 # Ok, it's dead, clean the socket
249 self._ctrl_sock.bind(CTRL_SOCK)
251 self._ctrl_sock.listen(0)
254 pidfile = open(CTRL_PID, "w")
255 pidfile.write(str(os.getpid()))
258 # let the parent process know that the daemonization is finished
263 def post_daemonize(self):
264 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
265 # QT, for some strange reason, redefines the SIGCHILD handler to write
266 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
267 # Server dameonization closes all file descriptors from fileno '3',
268 # but the overloaded handler (inherited by the forked process) will
269 # keep trying to write the \0 to fileno 'x', which might have been reused
270 # after closing, for other operations. This is bad bad bad when fileno 'x'
271 # is in use for communication pouroses, because unexpected \0 start
272 # appearing in the communication messages... this is exactly what happens
273 # when using netns in daemonized form. Thus, be have no other alternative than
274 # restoring the SIGCHLD handler to the default here.
276 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
279 while not self._stop:
280 conn, addr = self._ctrl_sock.accept()
281 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
283 while not self._stop:
285 msg = self.recv_msg(conn)
286 except socket.timeout, e:
287 #self.log_error("SERVER recv_msg: connection timedout ")
291 self.log_error("CONNECTION LOST")
296 reply = self.stop_action()
298 reply = self.reply_action(msg)
301 self.send_reply(conn, reply)
304 self.log_error("NOTICE: Awaiting for reconnection")
312 def recv_msg(self, conn):
315 while '\n' not in chunk:
317 chunk = conn.recv(1024)
318 except (OSError, socket.error), e:
319 if e[0] != errno.EINTR:
328 data = ''.join(data).split('\n',1)
331 data, self._rdbuf = data
333 decoded = base64.b64decode(data)
334 return decoded.rstrip()
336 def send_reply(self, conn, reply):
337 encoded = base64.b64encode(reply)
338 conn.send("%s\n" % encoded)
342 self._ctrl_sock.close()
347 def stop_action(self):
348 return "Stopping server"
350 def reply_action(self, msg):
351 return "Reply to: %s" % msg
353 def log_error(self, text = None, context = ''):
355 text = traceback.format_exc()
356 date = time.strftime("%Y-%m-%d %H:%M:%S")
358 context = " (%s)" % (context,)
359 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
362 def log_debug(self, text):
363 if self._log_level == DC.DEBUG_LEVEL:
364 date = time.strftime("%Y-%m-%d %H:%M:%S")
365 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
367 class Forwarder(object):
368 def __init__(self, root_dir = "."):
369 self._ctrl_sock = None
370 self._root_dir = root_dir
376 print >>sys.stderr, "FORWARDER_READY."
377 while not self._stop:
378 data = self.read_data()
380 # Connection to client lost
382 self.send_to_server(data)
384 data = self.recv_from_server()
386 # Connection to server lost
387 raise IOError, "Connection to server lost while "\
389 self.write_data(data)
393 return sys.stdin.readline()
395 def write_data(self, data):
396 sys.stdout.write(data)
397 # sys.stdout.write is buffered, this is why we need to do a flush()
400 def send_to_server(self, data):
402 self._ctrl_sock.send(data)
403 except (IOError, socket.error), e:
404 if e[0] == errno.EPIPE:
406 self._ctrl_sock.send(data)
409 encoded = data.rstrip()
410 msg = base64.b64decode(encoded)
414 def recv_from_server(self):
417 while '\n' not in chunk:
419 chunk = self._ctrl_sock.recv(1024)
420 except (OSError, socket.error), e:
421 if e[0] != errno.EINTR:
429 data = ''.join(data).split('\n',1)
432 data, self._rdbuf = data
438 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
439 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
440 self._ctrl_sock.connect(sock_addr)
442 def disconnect(self):
444 self._ctrl_sock.close()
448 class Client(object):
449 def __init__(self, root_dir = ".", host = None, port = None, user = None,
450 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
451 environment_setup = ""):
452 self.root_dir = root_dir
453 self.addr = (host, port)
457 self.communication = communication
458 self.environment_setup = environment_setup
459 self._stopped = False
460 self._deferreds = collections.deque()
464 if self._process.poll() is None:
465 os.kill(self._process.pid, signal.SIGTERM)
469 root_dir = self.root_dir
470 (host, port) = self.addr
474 communication = self.communication
476 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
477 c.forward()" % (root_dir,)
479 self._process = popen_python(python_code,
480 communication = communication,
486 environment_setup = self.environment_setup)
488 # Wait for the forwarder to be ready, otherwise nobody
489 # will be able to connect to it
493 helo = self._process.stderr.readline()
494 if helo == 'FORWARDER_READY.\n':
498 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
500 def send_msg(self, msg):
501 encoded = base64.b64encode(msg)
502 data = "%s\n" % encoded
505 self._process.stdin.write(data)
506 except (IOError, ValueError):
507 # dead process, poll it to un-zombify
510 # try again after reconnect
511 # If it fails again, though, give up
513 self._process.stdin.write(data)
516 self.send_msg(STOP_MSG)
519 def defer_reply(self, transform=None):
521 self._deferreds.append(defer_entry)
523 functools.partial(self.read_reply, defer_entry, transform)
526 def _read_reply(self):
527 data = self._process.stdout.readline()
528 encoded = data.rstrip()
530 # empty == eof == dead process, poll it to un-zombify
533 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
534 return base64.b64decode(encoded)
536 def read_reply(self, which=None, transform=None):
537 # Test to see if someone did it already
538 if which is not None and len(which):
540 # ...just return the deferred value
542 return transform(which[0])
546 # Process all deferreds until the one we're looking for
547 # or until the queue is empty
548 while self._deferreds:
550 deferred = self._deferreds.popleft()
555 deferred.append(self._read_reply())
556 if deferred is which:
557 # We reached the one we were looking for
559 return transform(deferred[0])
564 # They've requested a synchronous read
566 return transform(self._read_reply())
568 return self._read_reply()
570 def _make_server_key_args(server_key, host, port, args):
572 Returns a reference to the created temporary file, and adds the
573 corresponding arguments to the given argument list.
575 Make sure to hold onto it until the process is done with the file
578 host = '%s:%s' % (host,port)
579 # Create a temporary server key file
580 tmp_known_hosts = tempfile.NamedTemporaryFile()
582 hostbyname = gethostbyname(host)
584 # Add the intended host key
585 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
587 # If we're not in strict mode, add user-configured keys
588 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
589 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
590 if os.access(user_hosts_path, os.R_OK):
591 f = open(user_hosts_path, "r")
592 tmp_known_hosts.write(f.read())
595 tmp_known_hosts.flush()
597 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
599 return tmp_known_hosts
601 def popen_ssh_command(command, host, port, user, agent,
608 err_on_timeout = True,
609 connect_timeout = 60,
613 Executes a remote commands, returns ((stdout,stderr),process)
616 tmp_known_hosts = None
618 # Don't bother with localhost. Makes test easier
619 '-o', 'NoHostAuthenticationForLocalhost=yes',
620 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
621 '-o', 'ConnectionAttempts=3',
622 '-o', 'ServerAliveInterval=30',
623 '-o', 'TCPKeepAlive=yes',
624 '-l', user, hostip or host]
625 if persistent and openssh_has_persist():
627 '-o', 'ControlMaster=auto',
628 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
629 '-o', 'ControlPersist=60' ])
633 args.append('-p%d' % port)
635 args.extend(('-i', ident_key))
640 # Create a temporary server key file
641 tmp_known_hosts = _make_server_key_args(
642 server_key, host, port, args)
645 for x in xrange(retry or 3):
646 # connects to the remote host and starts a remote connection
647 proc = subprocess.Popen(args,
648 stdout = subprocess.PIPE,
649 stdin = subprocess.PIPE,
650 stderr = subprocess.PIPE)
652 # attach tempfile object to the process, to make sure the file stays
653 # alive until the process is finished with it
654 proc._known_hosts = tmp_known_hosts
657 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
659 print "COMMAND host %s, command %s, out %s, error %s" % (host, " ".join(args), out, err)
662 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
663 # SSH error, can safely retry
666 # Probably timed out or plain failed but can retry
669 except RuntimeError,e:
671 print "EXCEPTION host %s, command %s, out %s, error %s, exception TIMEOUT -> %s" % (
672 host, " ".join(args), out, err, e.args)
678 return ((out, err), proc)
680 def popen_scp(source, dest,
687 Copies from/to remote sites.
689 Source and destination should have the user and host encoded
692 If source is a file object, a special mode will be used to
693 create the remote file with the same contents.
695 If dest is a file object, the remote file (source) will be
696 read and written into dest.
698 In these modes, recursive cannot be True.
700 Source can be a list of files to copy to a single destination,
701 in which case it is advised that the destination be a folder.
705 print "scp", source, dest
707 if isinstance(source, file) and source.tell() == 0:
709 elif hasattr(source, 'read'):
710 tmp = tempfile.NamedTemporaryFile()
712 buf = source.read(65536)
720 if isinstance(source, file) or isinstance(dest, file) \
721 or hasattr(source, 'read') or hasattr(dest, 'write'):
724 # Parse source/destination as <user>@<server>:<path>
725 if isinstance(dest, basestring) and ':' in dest:
726 remspec, path = dest.split(':',1)
727 elif isinstance(source, basestring) and ':' in source:
728 remspec, path = source.split(':',1)
730 raise ValueError, "Both endpoints cannot be local"
731 user,host = remspec.rsplit('@',1)
732 tmp_known_hosts = None
734 args = ['ssh', '-l', user, '-C',
735 # Don't bother with localhost. Makes test easier
736 '-o', 'NoHostAuthenticationForLocalhost=yes',
737 '-o', 'ConnectTimeout=60',
738 '-o', 'ConnectionAttempts=3',
739 '-o', 'ServerAliveInterval=30',
740 '-o', 'TCPKeepAlive=yes',
742 if openssh_has_persist():
744 '-o', 'ControlMaster=auto',
745 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
746 '-o', 'ControlPersist=60' ])
748 args.append('-P%d' % port)
750 args.extend(('-i', ident_key))
752 # Create a temporary server key file
753 tmp_known_hosts = _make_server_key_args(
754 server_key, host, port, args)
756 if isinstance(source, file) or hasattr(source, 'read'):
757 args.append('cat > %s' % (shell_escape(path),))
758 elif isinstance(dest, file) or hasattr(dest, 'write'):
759 args.append('cat %s' % (shell_escape(path),))
761 raise AssertionError, "Unreachable code reached! :-Q"
763 # connects to the remote host and starts a remote connection
764 if isinstance(source, file):
765 proc = subprocess.Popen(args,
766 stdout = open('/dev/null','w'),
767 stderr = subprocess.PIPE,
769 err = proc.stderr.read()
770 proc._known_hosts = tmp_known_hosts
771 eintr_retry(proc.wait)()
772 return ((None,err), proc)
773 elif isinstance(dest, file):
774 proc = subprocess.Popen(args,
775 stdout = open('/dev/null','w'),
776 stderr = subprocess.PIPE,
778 err = proc.stderr.read()
779 proc._known_hosts = tmp_known_hosts
780 eintr_retry(proc.wait)()
781 return ((None,err), proc)
782 elif hasattr(source, 'read'):
783 # file-like (but not file) source
784 proc = subprocess.Popen(args,
785 stdout = open('/dev/null','w'),
786 stderr = subprocess.PIPE,
787 stdin = subprocess.PIPE)
793 buf = source.read(4096)
798 rdrdy, wrdy, broken = select.select(
801 [proc.stderr,proc.stdin])
803 if proc.stderr in rdrdy:
804 # use os.read for fully unbuffered behavior
805 err.append(os.read(proc.stderr.fileno(), 4096))
807 if proc.stdin in wrdy:
808 proc.stdin.write(buf)
814 err.append(proc.stderr.read())
816 proc._known_hosts = tmp_known_hosts
817 eintr_retry(proc.wait)()
818 return ((None,''.join(err)), proc)
819 elif hasattr(dest, 'write'):
820 # file-like (but not file) dest
821 proc = subprocess.Popen(args,
822 stdout = subprocess.PIPE,
823 stderr = subprocess.PIPE,
824 stdin = open('/dev/null','w'))
829 rdrdy, wrdy, broken = select.select(
830 [proc.stderr, proc.stdout],
832 [proc.stderr, proc.stdout])
834 if proc.stderr in rdrdy:
835 # use os.read for fully unbuffered behavior
836 err.append(os.read(proc.stderr.fileno(), 4096))
838 if proc.stdout in rdrdy:
839 # use os.read for fully unbuffered behavior
840 buf = os.read(proc.stdout.fileno(), 4096)
849 err.append(proc.stderr.read())
851 proc._known_hosts = tmp_known_hosts
852 eintr_retry(proc.wait)()
853 return ((None,''.join(err)), proc)
855 raise AssertionError, "Unreachable code reached! :-Q"
857 # Parse destination as <user>@<server>:<path>
858 if isinstance(dest, basestring) and ':' in dest:
859 remspec, path = dest.split(':',1)
860 elif isinstance(source, basestring) and ':' in source:
861 remspec, path = source.split(':',1)
863 raise ValueError, "Both endpoints cannot be local"
864 user,host = remspec.rsplit('@',1)
867 tmp_known_hosts = None
868 args = ['scp', '-q', '-p', '-C',
869 # Don't bother with localhost. Makes test easier
870 '-o', 'NoHostAuthenticationForLocalhost=yes',
871 '-o', 'ConnectTimeout=60',
872 '-o', 'ConnectionAttempts=3',
873 '-o', 'ServerAliveInterval=30',
874 '-o', 'TCPKeepAlive=yes' ]
877 args.append('-P%d' % port)
881 args.extend(('-i', ident_key))
883 # Create a temporary server key file
884 tmp_known_hosts = _make_server_key_args(
885 server_key, host, port, args)
886 if isinstance(source,list):
889 if openssh_has_persist():
891 '-o', 'ControlMaster=auto',
892 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p'])
896 # connects to the remote host and starts a remote connection
897 proc = subprocess.Popen(args,
898 stdout = subprocess.PIPE,
899 stdin = subprocess.PIPE,
900 stderr = subprocess.PIPE)
901 proc._known_hosts = tmp_known_hosts
903 comm = proc.communicate()
904 eintr_retry(proc.wait)()
907 def decode_and_execute():
908 # The python code we want to execute might have characters that
909 # are not compatible with the 'inline' mode we are using. To avoid
910 # problems we receive the encoded python code in base64 as a input
911 # stream and decode it for execution.
916 cmd += os.read(0, 1)# one byte from stdin
918 if e.errno == errno.EINTR:
924 cmd = base64.b64decode(cmd)
925 # Uncomment for debug
926 #os.write(2, "Executing python code: %s\n" % cmd)
927 os.write(1, "OK\n") # send a sync message
930 def popen_python(python_code,
931 communication = DC.ACCESS_LOCAL,
941 environment_setup = ""):
945 python_path.replace("'", r"'\''")
946 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
948 if environment_setup:
949 cmd += environment_setup
951 # Uncomment for debug (to run everything under strace)
952 # We had to verify if strace works (cannot nest them)
953 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
955 #cmd += "strace -f -tt -s 200 -o strace$$.out "
957 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
958 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
963 cmd = "sudo bash -c " + shell_escape(cmd)
967 if communication == DC.ACCESS_SSH:
968 tmp_known_hosts = None
970 # Don't bother with localhost. Makes test easier
971 '-o', 'NoHostAuthenticationForLocalhost=yes',
972 '-o', 'ConnectionAttempts=3',
973 '-o', 'ServerAliveInterval=30',
974 '-o', 'TCPKeepAlive=yes',
979 args.append('-p%d' % port)
981 args.extend(('-i', ident_key))
985 # Create a temporary server key file
986 tmp_known_hosts = _make_server_key_args(
987 server_key, host, port, args)
990 args = [ "/bin/bash", "-c", cmd ]
992 # connects to the remote host and starts a remote
993 proc = subprocess.Popen(args,
995 stdout = subprocess.PIPE,
996 stdin = subprocess.PIPE,
997 stderr = subprocess.PIPE)
999 if communication == DC.ACCESS_SSH:
1000 proc._known_hosts = tmp_known_hosts
1002 # send the command to execute
1003 os.write(proc.stdin.fileno(),
1004 base64.b64encode(python_code) + "\n")
1008 msg = os.read(proc.stdout.fileno(), 3)
1011 if e.errno == errno.EINTR:
1017 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1018 msg, proc.stdout.read(), proc.stderr.read())
1023 def _communicate(self, input, timeout=None, err_on_timeout=True):
1026 stdout = None # Return
1027 stderr = None # Return
1031 if timeout is not None:
1032 timelimit = time.time() + timeout
1033 killtime = timelimit + 4
1034 bailtime = timelimit + 4
1037 # Flush stdio buffer. This might block, if the user has
1038 # been writing to .stdin in an uncontrolled fashion.
1041 write_set.append(self.stdin)
1045 read_set.append(self.stdout)
1048 read_set.append(self.stderr)
1052 while read_set or write_set:
1053 if timeout is not None:
1054 curtime = time.time()
1055 if timeout is None or curtime > timelimit:
1056 if curtime > bailtime:
1058 elif curtime > killtime:
1059 signum = signal.SIGKILL
1061 signum = signal.SIGTERM
1063 os.kill(self.pid, signum)
1064 select_timeout = 0.5
1066 select_timeout = timelimit - curtime + 0.1
1068 select_timeout = 1.0
1070 if select_timeout > 1.0:
1071 select_timeout = 1.0
1074 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1075 except select.error,e:
1081 if not rlist and not wlist and not xlist and self.poll() is not None:
1082 # timeout and process exited, say bye
1085 if self.stdin in wlist:
1086 # When select has indicated that the file is writable,
1087 # we can write up to PIPE_BUF bytes without risk
1088 # blocking. POSIX defines PIPE_BUF >= 512
1089 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1090 input_offset += bytes_written
1091 if input_offset >= len(input):
1093 write_set.remove(self.stdin)
1095 if self.stdout in rlist:
1096 data = os.read(self.stdout.fileno(), 1024)
1099 read_set.remove(self.stdout)
1102 if self.stderr in rlist:
1103 data = os.read(self.stderr.fileno(), 1024)
1106 read_set.remove(self.stderr)
1109 # All data exchanged. Translate lists into strings.
1110 if stdout is not None:
1111 stdout = ''.join(stdout)
1112 if stderr is not None:
1113 stderr = ''.join(stderr)
1115 # Translate newlines, if requested. We cannot let the file
1116 # object do the translation: It is based on stdio, which is
1117 # impossible to combine with select (unless forcing no
1119 if self.universal_newlines and hasattr(file, 'newlines'):
1121 stdout = self._translate_newlines(stdout)
1123 stderr = self._translate_newlines(stderr)
1125 if killed and err_on_timeout:
1126 errcode = self.poll()
1127 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1133 return (stdout, stderr)