1 # -*- coding: utf-8 -*-
3 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
28 STD_ERR = "stderr.log"
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35 OPENSSH_HAS_PERSIST = None
37 if hasattr(os, "devnull"):
40 DEV_NULL = "/dev/null"
42 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44 hostbyname_cache = dict()
46 def gethostbyname(host):
47 hostbyname = hostbyname_cache.get(host)
49 hostbyname = socket.gethostbyname(host)
50 hostbyname_cache[host] = hostbyname
53 def openssh_has_persist():
54 global OPENSSH_HAS_PERSIST
55 if OPENSSH_HAS_PERSIST is None:
56 proc = subprocess.Popen(["ssh","-v"],
57 stdout = subprocess.PIPE,
58 stderr = subprocess.STDOUT,
59 stdin = open("/dev/null","r") )
60 out,err = proc.communicate()
63 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
64 OPENSSH_HAS_PERSIST = bool(vre.match(out))
65 return OPENSSH_HAS_PERSIST
68 """ Escapes strings so that they are safe to use as command-line arguments """
69 if SHELL_SAFE.match(s):
70 # safe string - no escaping needed
73 # unsafe string - escape
75 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
78 return "'$'\\x%02x''" % (ord(c),)
79 s = ''.join(map(escp,s))
82 def eintr_retry(func):
84 @functools.wraps(func)
86 retry = kw.pop("_retry", False)
87 for i in xrange(0 if retry else 4):
90 except (select.error, socket.error), args:
91 if args[0] == errno.EINTR:
96 if e.errno == errno.EINTR:
101 return func(*p, **kw)
104 class Server(object):
105 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
106 environment_setup = "", clean_root = False):
107 self._root_dir = root_dir
108 self._clean_root = clean_root
110 self._ctrl_sock = None
111 self._log_level = log_level
113 self._environment_setup = environment_setup
118 self.post_daemonize()
122 # can not return normally after fork beacuse no exec was done.
123 # This means that if we don't do a os._exit(0) here the code that
124 # follows the call to "Server.run()" in the "caller code" will be
125 # executed... but by now it has already been executed after the
126 # first process (the one that did the first fork) returned.
129 print >>sys.stderr, "SERVER_ERROR."
133 print >>sys.stderr, "SERVER_READY."
136 # pipes for process synchronization
140 root = os.path.normpath(self._root_dir)
141 if self._root_dir not in [".", ""] and os.path.exists(root) \
142 and self._clean_root:
144 if not os.path.exists(root):
145 os.makedirs(root, 0755)
153 except OSError, e: # pragma: no cover
154 if e.errno == errno.EINTR:
160 # os.waitpid avoids leaving a <defunc> (zombie) process
161 st = os.waitpid(pid1, 0)[1]
163 raise RuntimeError("Daemonization failed")
164 # return 0 to inform the caller method that this is not the
169 # Decouple from parent environment.
170 os.chdir(self._root_dir)
177 # see ref: "os._exit(0)"
180 # close all open file descriptors.
181 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
182 if (max_fd == resource.RLIM_INFINITY):
184 for fd in range(3, max_fd):
191 # Redirect standard file descriptors.
192 stdin = open(DEV_NULL, "r")
193 stderr = stdout = open(STD_ERR, "a", 0)
194 os.dup2(stdin.fileno(), sys.stdin.fileno())
195 # NOTE: sys.stdout.write will still be buffered, even if the file
196 # was opened with 0 buffer
197 os.dup2(stdout.fileno(), sys.stdout.fileno())
198 os.dup2(stderr.fileno(), sys.stderr.fileno())
201 if self._environment_setup:
202 # parse environment variables and pass to child process
203 # do it by executing shell commands, in case there's some heavy setup involved
204 envproc = subprocess.Popen(
206 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
207 ( self._environment_setup, ) ],
208 stdin = subprocess.PIPE,
209 stdout = subprocess.PIPE,
210 stderr = subprocess.PIPE
212 out,err = envproc.communicate()
214 # parse new environment
216 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
218 # apply to current environment
219 for name, value in environment.iteritems():
220 os.environ[name] = value
223 if 'PYTHONPATH' in environment:
224 sys.path = environment['PYTHONPATH'].split(':') + sys.path
226 # create control socket
227 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
229 self._ctrl_sock.bind(CTRL_SOCK)
231 # Address in use, check pidfile
234 pidfile = open(CTRL_PID, "r")
243 # Check process liveliness
244 if not os.path.exists("/proc/%d" % (pid,)):
245 # Ok, it's dead, clean the socket
249 self._ctrl_sock.bind(CTRL_SOCK)
251 self._ctrl_sock.listen(0)
254 pidfile = open(CTRL_PID, "w")
255 pidfile.write(str(os.getpid()))
258 # let the parent process know that the daemonization is finished
263 def post_daemonize(self):
264 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
265 # QT, for some strange reason, redefines the SIGCHILD handler to write
266 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
267 # Server dameonization closes all file descriptors from fileno '3',
268 # but the overloaded handler (inherited by the forked process) will
269 # keep trying to write the \0 to fileno 'x', which might have been reused
270 # after closing, for other operations. This is bad bad bad when fileno 'x'
271 # is in use for communication pouroses, because unexpected \0 start
272 # appearing in the communication messages... this is exactly what happens
273 # when using netns in daemonized form. Thus, be have no other alternative than
274 # restoring the SIGCHLD handler to the default here.
276 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
279 while not self._stop:
280 conn, addr = self._ctrl_sock.accept()
281 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
283 while not self._stop:
285 msg = self.recv_msg(conn)
286 except socket.timeout, e:
287 #self.log_error("SERVER recv_msg: connection timedout ")
291 self.log_error("CONNECTION LOST")
296 reply = self.stop_action()
298 reply = self.reply_action(msg)
301 self.send_reply(conn, reply)
304 self.log_error("NOTICE: Awaiting for reconnection")
312 def recv_msg(self, conn):
315 while '\n' not in chunk:
317 chunk = conn.recv(1024)
318 except (OSError, socket.error), e:
319 if e[0] != errno.EINTR:
328 data = ''.join(data).split('\n',1)
331 data, self._rdbuf = data
333 decoded = base64.b64decode(data)
334 return decoded.rstrip()
336 def send_reply(self, conn, reply):
337 encoded = base64.b64encode(reply)
338 conn.send("%s\n" % encoded)
342 self._ctrl_sock.close()
347 def stop_action(self):
348 return "Stopping server"
350 def reply_action(self, msg):
351 return "Reply to: %s" % msg
353 def log_error(self, text = None, context = ''):
355 text = traceback.format_exc()
356 date = time.strftime("%Y-%m-%d %H:%M:%S")
358 context = " (%s)" % (context,)
359 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
362 def log_debug(self, text):
363 if self._log_level == DC.DEBUG_LEVEL:
364 date = time.strftime("%Y-%m-%d %H:%M:%S")
365 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
367 class Forwarder(object):
368 def __init__(self, root_dir = "."):
369 self._ctrl_sock = None
370 self._root_dir = root_dir
376 print >>sys.stderr, "FORWARDER_READY."
377 while not self._stop:
378 data = self.read_data()
380 # Connection to client lost
382 self.send_to_server(data)
384 data = self.recv_from_server()
386 # Connection to server lost
387 raise IOError, "Connection to server lost while "\
389 self.write_data(data)
393 return sys.stdin.readline()
395 def write_data(self, data):
396 sys.stdout.write(data)
397 # sys.stdout.write is buffered, this is why we need to do a flush()
400 def send_to_server(self, data):
402 self._ctrl_sock.send(data)
403 except (IOError, socket.error), e:
404 if e[0] == errno.EPIPE:
406 self._ctrl_sock.send(data)
409 encoded = data.rstrip()
410 msg = base64.b64decode(encoded)
414 def recv_from_server(self):
417 while '\n' not in chunk:
419 chunk = self._ctrl_sock.recv(1024)
420 except (OSError, socket.error), e:
421 if e[0] != errno.EINTR:
429 data = ''.join(data).split('\n',1)
432 data, self._rdbuf = data
438 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
439 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
440 self._ctrl_sock.connect(sock_addr)
442 def disconnect(self):
444 self._ctrl_sock.close()
448 class Client(object):
449 def __init__(self, root_dir = ".", host = None, port = None, user = None,
450 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
451 environment_setup = ""):
452 self.root_dir = root_dir
453 self.addr = (host, port)
457 self.communication = communication
458 self.environment_setup = environment_setup
459 self._stopped = False
460 self._deferreds = collections.deque()
464 if self._process.poll() is None:
465 os.kill(self._process.pid, signal.SIGTERM)
469 root_dir = self.root_dir
470 (host, port) = self.addr
474 communication = self.communication
476 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
477 c.forward()" % (root_dir,)
479 self._process = popen_python(python_code,
480 communication = communication,
486 environment_setup = self.environment_setup)
488 # Wait for the forwarder to be ready, otherwise nobody
489 # will be able to connect to it
493 helo = self._process.stderr.readline()
494 if helo == 'FORWARDER_READY.\n':
498 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
500 def send_msg(self, msg):
501 encoded = base64.b64encode(msg)
502 data = "%s\n" % encoded
505 self._process.stdin.write(data)
506 except (IOError, ValueError):
507 # dead process, poll it to un-zombify
510 # try again after reconnect
511 # If it fails again, though, give up
513 self._process.stdin.write(data)
516 self.send_msg(STOP_MSG)
519 def defer_reply(self, transform=None):
521 self._deferreds.append(defer_entry)
523 functools.partial(self.read_reply, defer_entry, transform)
526 def _read_reply(self):
527 data = self._process.stdout.readline()
528 encoded = data.rstrip()
530 # empty == eof == dead process, poll it to un-zombify
533 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
534 return base64.b64decode(encoded)
536 def read_reply(self, which=None, transform=None):
537 # Test to see if someone did it already
538 if which is not None and len(which):
540 # ...just return the deferred value
542 return transform(which[0])
546 # Process all deferreds until the one we're looking for
547 # or until the queue is empty
548 while self._deferreds:
550 deferred = self._deferreds.popleft()
555 deferred.append(self._read_reply())
556 if deferred is which:
557 # We reached the one we were looking for
559 return transform(deferred[0])
564 # They've requested a synchronous read
566 return transform(self._read_reply())
568 return self._read_reply()
570 def _make_server_key_args(server_key, host, port, args):
572 Returns a reference to the created temporary file, and adds the
573 corresponding arguments to the given argument list.
575 Make sure to hold onto it until the process is done with the file
578 host = '%s:%s' % (host,port)
579 # Create a temporary server key file
580 tmp_known_hosts = tempfile.NamedTemporaryFile()
582 hostbyname = gethostbyname(host)
584 # Add the intended host key
585 tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
587 # If we're not in strict mode, add user-configured keys
588 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
589 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
590 if os.access(user_hosts_path, os.R_OK):
591 f = open(user_hosts_path, "r")
592 tmp_known_hosts.write(f.read())
595 tmp_known_hosts.flush()
597 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
599 return tmp_known_hosts
601 def popen_ssh_command(command, host, port, user, agent,
608 err_on_timeout = True,
609 connect_timeout = 60,
613 Executes a remote commands, returns ((stdout,stderr),process)
616 tmp_known_hosts = None
618 # Don't bother with localhost. Makes test easier
619 '-o', 'NoHostAuthenticationForLocalhost=yes',
620 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
621 '-o', 'ConnectionAttempts=3',
622 '-o', 'ServerAliveInterval=30',
623 '-o', 'TCPKeepAlive=yes',
624 '-l', user, hostip or host]
625 if persistent and openssh_has_persist():
627 '-o', 'ControlMaster=auto',
628 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
629 '-o', 'ControlPersist=60' ])
633 args.append('-p%d' % port)
635 args.extend(('-i', ident_key))
640 # Create a temporary server key file
641 tmp_known_hosts = _make_server_key_args(
642 server_key, host, port, args)
645 for x in xrange(retry or 3):
646 # connects to the remote host and starts a remote connection
647 proc = subprocess.Popen(args,
648 stdout = subprocess.PIPE,
649 stdin = subprocess.PIPE,
650 stderr = subprocess.PIPE)
652 # attach tempfile object to the process, to make sure the file stays
653 # alive until the process is finished with it
654 proc._known_hosts = tmp_known_hosts
657 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
659 print "COMMAND host %s, command %s, out %s, error %s" % (host, " ".join(args), out, err)
662 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
663 # SSH error, can safely retry
666 ControlSocket /tmp/nepi_ssh-inria_alina@planetlab04.cnds.unibe.ch:22 already exists, disabling multiplexing
667 # SSH error, can safely retry (but need to delete controlpath file)
671 # Probably timed out or plain failed but can retry
674 except RuntimeError,e:
676 print "EXCEPTION host %s, command %s, out %s, error %s, exception TIMEOUT -> %s" % (
677 host, " ".join(args), out, err, e.args)
683 return ((out, err), proc)
685 def popen_scp(source, dest,
692 Copies from/to remote sites.
694 Source and destination should have the user and host encoded
697 If source is a file object, a special mode will be used to
698 create the remote file with the same contents.
700 If dest is a file object, the remote file (source) will be
701 read and written into dest.
703 In these modes, recursive cannot be True.
705 Source can be a list of files to copy to a single destination,
706 in which case it is advised that the destination be a folder.
710 print "scp", source, dest
712 if isinstance(source, file) and source.tell() == 0:
714 elif hasattr(source, 'read'):
715 tmp = tempfile.NamedTemporaryFile()
717 buf = source.read(65536)
725 if isinstance(source, file) or isinstance(dest, file) \
726 or hasattr(source, 'read') or hasattr(dest, 'write'):
729 # Parse source/destination as <user>@<server>:<path>
730 if isinstance(dest, basestring) and ':' in dest:
731 remspec, path = dest.split(':',1)
732 elif isinstance(source, basestring) and ':' in source:
733 remspec, path = source.split(':',1)
735 raise ValueError, "Both endpoints cannot be local"
736 user,host = remspec.rsplit('@',1)
737 tmp_known_hosts = None
739 args = ['ssh', '-l', user, '-C',
740 # Don't bother with localhost. Makes test easier
741 '-o', 'NoHostAuthenticationForLocalhost=yes',
742 '-o', 'ConnectTimeout=60',
743 '-o', 'ConnectionAttempts=3',
744 '-o', 'ServerAliveInterval=30',
745 '-o', 'TCPKeepAlive=yes',
747 if openssh_has_persist():
749 '-o', 'ControlMaster=auto',
750 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p',
751 '-o', 'ControlPersist=60' ])
753 args.append('-P%d' % port)
755 args.extend(('-i', ident_key))
757 # Create a temporary server key file
758 tmp_known_hosts = _make_server_key_args(
759 server_key, host, port, args)
761 if isinstance(source, file) or hasattr(source, 'read'):
762 args.append('cat > %s' % (shell_escape(path),))
763 elif isinstance(dest, file) or hasattr(dest, 'write'):
764 args.append('cat %s' % (shell_escape(path),))
766 raise AssertionError, "Unreachable code reached! :-Q"
768 # connects to the remote host and starts a remote connection
769 if isinstance(source, file):
770 proc = subprocess.Popen(args,
771 stdout = open('/dev/null','w'),
772 stderr = subprocess.PIPE,
774 err = proc.stderr.read()
775 proc._known_hosts = tmp_known_hosts
776 eintr_retry(proc.wait)()
777 return ((None,err), proc)
778 elif isinstance(dest, file):
779 proc = subprocess.Popen(args,
780 stdout = open('/dev/null','w'),
781 stderr = subprocess.PIPE,
783 err = proc.stderr.read()
784 proc._known_hosts = tmp_known_hosts
785 eintr_retry(proc.wait)()
786 return ((None,err), proc)
787 elif hasattr(source, 'read'):
788 # file-like (but not file) source
789 proc = subprocess.Popen(args,
790 stdout = open('/dev/null','w'),
791 stderr = subprocess.PIPE,
792 stdin = subprocess.PIPE)
798 buf = source.read(4096)
803 rdrdy, wrdy, broken = select.select(
806 [proc.stderr,proc.stdin])
808 if proc.stderr in rdrdy:
809 # use os.read for fully unbuffered behavior
810 err.append(os.read(proc.stderr.fileno(), 4096))
812 if proc.stdin in wrdy:
813 proc.stdin.write(buf)
819 err.append(proc.stderr.read())
821 proc._known_hosts = tmp_known_hosts
822 eintr_retry(proc.wait)()
823 return ((None,''.join(err)), proc)
824 elif hasattr(dest, 'write'):
825 # file-like (but not file) dest
826 proc = subprocess.Popen(args,
827 stdout = subprocess.PIPE,
828 stderr = subprocess.PIPE,
829 stdin = open('/dev/null','w'))
834 rdrdy, wrdy, broken = select.select(
835 [proc.stderr, proc.stdout],
837 [proc.stderr, proc.stdout])
839 if proc.stderr in rdrdy:
840 # use os.read for fully unbuffered behavior
841 err.append(os.read(proc.stderr.fileno(), 4096))
843 if proc.stdout in rdrdy:
844 # use os.read for fully unbuffered behavior
845 buf = os.read(proc.stdout.fileno(), 4096)
854 err.append(proc.stderr.read())
856 proc._known_hosts = tmp_known_hosts
857 eintr_retry(proc.wait)()
858 return ((None,''.join(err)), proc)
860 raise AssertionError, "Unreachable code reached! :-Q"
862 # Parse destination as <user>@<server>:<path>
863 if isinstance(dest, basestring) and ':' in dest:
864 remspec, path = dest.split(':',1)
865 elif isinstance(source, basestring) and ':' in source:
866 remspec, path = source.split(':',1)
868 raise ValueError, "Both endpoints cannot be local"
869 user,host = remspec.rsplit('@',1)
872 tmp_known_hosts = None
873 args = ['scp', '-q', '-p', '-C',
874 # Don't bother with localhost. Makes test easier
875 '-o', 'NoHostAuthenticationForLocalhost=yes',
876 '-o', 'ConnectTimeout=60',
877 '-o', 'ConnectionAttempts=3',
878 '-o', 'ServerAliveInterval=30',
879 '-o', 'TCPKeepAlive=yes' ]
882 args.append('-P%d' % port)
886 args.extend(('-i', ident_key))
888 # Create a temporary server key file
889 tmp_known_hosts = _make_server_key_args(
890 server_key, host, port, args)
891 if isinstance(source,list):
894 if openssh_has_persist():
896 '-o', 'ControlMaster=auto',
897 '-o', 'ControlPath=/tmp/nepi_ssh-%r@%h:%p'])
901 # connects to the remote host and starts a remote connection
902 proc = subprocess.Popen(args,
903 stdout = subprocess.PIPE,
904 stdin = subprocess.PIPE,
905 stderr = subprocess.PIPE)
906 proc._known_hosts = tmp_known_hosts
908 comm = proc.communicate()
909 eintr_retry(proc.wait)()
912 def decode_and_execute():
913 # The python code we want to execute might have characters that
914 # are not compatible with the 'inline' mode we are using. To avoid
915 # problems we receive the encoded python code in base64 as a input
916 # stream and decode it for execution.
921 cmd += os.read(0, 1)# one byte from stdin
923 if e.errno == errno.EINTR:
929 cmd = base64.b64decode(cmd)
930 # Uncomment for debug
931 #os.write(2, "Executing python code: %s\n" % cmd)
932 os.write(1, "OK\n") # send a sync message
935 def popen_python(python_code,
936 communication = DC.ACCESS_LOCAL,
946 environment_setup = ""):
950 python_path.replace("'", r"'\''")
951 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
953 if environment_setup:
954 cmd += environment_setup
956 # Uncomment for debug (to run everything under strace)
957 # We had to verify if strace works (cannot nest them)
958 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
960 #cmd += "strace -f -tt -s 200 -o strace$$.out "
962 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
963 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
968 cmd = "sudo bash -c " + shell_escape(cmd)
972 if communication == DC.ACCESS_SSH:
973 tmp_known_hosts = None
975 # Don't bother with localhost. Makes test easier
976 '-o', 'NoHostAuthenticationForLocalhost=yes',
977 '-o', 'ConnectionAttempts=3',
978 '-o', 'ServerAliveInterval=30',
979 '-o', 'TCPKeepAlive=yes',
984 args.append('-p%d' % port)
986 args.extend(('-i', ident_key))
990 # Create a temporary server key file
991 tmp_known_hosts = _make_server_key_args(
992 server_key, host, port, args)
995 args = [ "/bin/bash", "-c", cmd ]
997 # connects to the remote host and starts a remote
998 proc = subprocess.Popen(args,
1000 stdout = subprocess.PIPE,
1001 stdin = subprocess.PIPE,
1002 stderr = subprocess.PIPE)
1004 if communication == DC.ACCESS_SSH:
1005 proc._known_hosts = tmp_known_hosts
1007 # send the command to execute
1008 os.write(proc.stdin.fileno(),
1009 base64.b64encode(python_code) + "\n")
1013 msg = os.read(proc.stdout.fileno(), 3)
1016 if e.errno == errno.EINTR:
1022 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1023 msg, proc.stdout.read(), proc.stderr.read())
1028 def _communicate(self, input, timeout=None, err_on_timeout=True):
1031 stdout = None # Return
1032 stderr = None # Return
1036 if timeout is not None:
1037 timelimit = time.time() + timeout
1038 killtime = timelimit + 4
1039 bailtime = timelimit + 4
1042 # Flush stdio buffer. This might block, if the user has
1043 # been writing to .stdin in an uncontrolled fashion.
1046 write_set.append(self.stdin)
1050 read_set.append(self.stdout)
1053 read_set.append(self.stderr)
1057 while read_set or write_set:
1058 if timeout is not None:
1059 curtime = time.time()
1060 if timeout is None or curtime > timelimit:
1061 if curtime > bailtime:
1063 elif curtime > killtime:
1064 signum = signal.SIGKILL
1066 signum = signal.SIGTERM
1068 os.kill(self.pid, signum)
1069 select_timeout = 0.5
1071 select_timeout = timelimit - curtime + 0.1
1073 select_timeout = 1.0
1075 if select_timeout > 1.0:
1076 select_timeout = 1.0
1079 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1080 except select.error,e:
1086 if not rlist and not wlist and not xlist and self.poll() is not None:
1087 # timeout and process exited, say bye
1090 if self.stdin in wlist:
1091 # When select has indicated that the file is writable,
1092 # we can write up to PIPE_BUF bytes without risk
1093 # blocking. POSIX defines PIPE_BUF >= 512
1094 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1095 input_offset += bytes_written
1096 if input_offset >= len(input):
1098 write_set.remove(self.stdin)
1100 if self.stdout in rlist:
1101 data = os.read(self.stdout.fileno(), 1024)
1104 read_set.remove(self.stdout)
1107 if self.stderr in rlist:
1108 data = os.read(self.stderr.fileno(), 1024)
1111 read_set.remove(self.stderr)
1114 # All data exchanged. Translate lists into strings.
1115 if stdout is not None:
1116 stdout = ''.join(stdout)
1117 if stderr is not None:
1118 stderr = ''.join(stderr)
1120 # Translate newlines, if requested. We cannot let the file
1121 # object do the translation: It is based on stdio, which is
1122 # impossible to combine with select (unless forcing no
1124 if self.universal_newlines and hasattr(file, 'newlines'):
1126 stdout = self._translate_newlines(stdout)
1128 stderr = self._translate_newlines(stderr)
1130 if killed and err_on_timeout:
1131 errcode = self.poll()
1132 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1138 return (stdout, stderr)