2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
27 CTRL_SOCK = "ctrl.sock"
29 STD_ERR = "stderr.log"
34 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
36 if hasattr(os, "devnull"):
39 DEV_NULL = "/dev/null"
41 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
44 """ Escapes strings so that they are safe to use as command-line arguments """
45 if SHELL_SAFE.match(s):
46 # safe string - no escaping needed
49 # unsafe string - escape
51 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
54 return "'$'\\x%02x''" % (ord(c),)
55 s = ''.join(map(escp,s))
58 def eintr_retry(func):
60 @functools.wraps(func)
62 retry = kw.pop("_retry", False)
63 for i in xrange(0 if retry else 4):
66 except (select.error, socket.error), args:
67 if args[0] == errno.EINTR:
72 if e.errno == errno.EINTR:
81 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
82 environment_setup = "", clean_root = False):
83 self._root_dir = root_dir
84 self._clean_root = clean_root
86 self._ctrl_sock = None
87 self._log_level = log_level
89 self._environment_setup = environment_setup
98 # can not return normally after fork beacuse no exec was done.
99 # This means that if we don't do a os._exit(0) here the code that
100 # follows the call to "Server.run()" in the "caller code" will be
101 # executed... but by now it has already been executed after the
102 # first process (the one that did the first fork) returned.
105 print >>sys.stderr, "SERVER_ERROR."
109 print >>sys.stderr, "SERVER_READY."
112 # pipes for process synchronization
116 root = os.path.normpath(self._root_dir)
117 if self._root_dir not in [".", ""] and os.path.exists(root) \
118 and self._clean_root:
120 if not os.path.exists(root):
121 os.makedirs(root, 0755)
129 except OSError, e: # pragma: no cover
130 if e.errno == errno.EINTR:
136 # os.waitpid avoids leaving a <defunc> (zombie) process
137 st = os.waitpid(pid1, 0)[1]
139 raise RuntimeError("Daemonization failed")
140 # return 0 to inform the caller method that this is not the
145 # Decouple from parent environment.
146 os.chdir(self._root_dir)
153 # see ref: "os._exit(0)"
156 # close all open file descriptors.
157 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
158 if (max_fd == resource.RLIM_INFINITY):
160 for fd in range(3, max_fd):
167 # Redirect standard file descriptors.
168 stdin = open(DEV_NULL, "r")
169 stderr = stdout = open(STD_ERR, "a", 0)
170 os.dup2(stdin.fileno(), sys.stdin.fileno())
171 # NOTE: sys.stdout.write will still be buffered, even if the file
172 # was opened with 0 buffer
173 os.dup2(stdout.fileno(), sys.stdout.fileno())
174 os.dup2(stderr.fileno(), sys.stderr.fileno())
177 if self._environment_setup:
178 # parse environment variables and pass to child process
179 # do it by executing shell commands, in case there's some heavy setup involved
180 envproc = subprocess.Popen(
182 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
183 ( self._environment_setup, ) ],
184 stdin = subprocess.PIPE,
185 stdout = subprocess.PIPE,
186 stderr = subprocess.PIPE
188 out,err = envproc.communicate()
190 # parse new environment
192 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
194 # apply to current environment
195 for name, value in environment.iteritems():
196 os.environ[name] = value
199 if 'PYTHONPATH' in environment:
200 sys.path = environment['PYTHONPATH'].split(':') + sys.path
202 # create control socket
203 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
205 self._ctrl_sock.bind(CTRL_SOCK)
207 # Address in use, check pidfile
210 pidfile = open(CTRL_PID, "r")
219 # Check process liveliness
220 if not os.path.exists("/proc/%d" % (pid,)):
221 # Ok, it's dead, clean the socket
225 self._ctrl_sock.bind(CTRL_SOCK)
227 self._ctrl_sock.listen(0)
230 pidfile = open(CTRL_PID, "w")
231 pidfile.write(str(os.getpid()))
234 # let the parent process know that the daemonization is finished
239 def post_daemonize(self):
240 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
241 # QT, for some strange reason, redefines the SIGCHILD handler to write
242 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
243 # Server dameonization closes all file descriptors from fileno '3',
244 # but the overloaded handler (inherited by the forked process) will
245 # keep trying to write the \0 to fileno 'x', which might have been reused
246 # after closing, for other operations. This is bad bad bad when fileno 'x'
247 # is in use for communication pouroses, because unexpected \0 start
248 # appearing in the communication messages... this is exactly what happens
249 # when using netns in daemonized form. Thus, be have no other alternative than
250 # restoring the SIGCHLD handler to the default here.
252 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
255 while not self._stop:
256 conn, addr = self._ctrl_sock.accept()
257 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
259 while not self._stop:
261 msg = self.recv_msg(conn)
262 except socket.timeout, e:
263 #self.log_error("SERVER recv_msg: connection timedout ")
267 self.log_error("CONNECTION LOST")
272 reply = self.stop_action()
274 reply = self.reply_action(msg)
277 self.send_reply(conn, reply)
280 self.log_error("NOTICE: Awaiting for reconnection")
288 def recv_msg(self, conn):
291 while '\n' not in chunk:
293 chunk = conn.recv(1024)
294 except (OSError, socket.error), e:
295 if e[0] != errno.EINTR:
304 data = ''.join(data).split('\n',1)
307 data, self._rdbuf = data
309 decoded = base64.b64decode(data)
310 return decoded.rstrip()
312 def send_reply(self, conn, reply):
313 encoded = base64.b64encode(reply)
314 conn.send("%s\n" % encoded)
318 self._ctrl_sock.close()
323 def stop_action(self):
324 return "Stopping server"
326 def reply_action(self, msg):
327 return "Reply to: %s" % msg
329 def log_error(self, text = None, context = ''):
331 text = traceback.format_exc()
332 date = time.strftime("%Y-%m-%d %H:%M:%S")
334 context = " (%s)" % (context,)
335 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
338 def log_debug(self, text):
339 if self._log_level == DC.DEBUG_LEVEL:
340 date = time.strftime("%Y-%m-%d %H:%M:%S")
341 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
343 class Forwarder(object):
344 def __init__(self, root_dir = "."):
345 self._ctrl_sock = None
346 self._root_dir = root_dir
352 print >>sys.stderr, "FORWARDER_READY."
353 while not self._stop:
354 data = self.read_data()
356 # Connection to client lost
358 self.send_to_server(data)
360 data = self.recv_from_server()
362 # Connection to server lost
363 raise IOError, "Connection to server lost while "\
365 self.write_data(data)
369 return sys.stdin.readline()
371 def write_data(self, data):
372 sys.stdout.write(data)
373 # sys.stdout.write is buffered, this is why we need to do a flush()
376 def send_to_server(self, data):
378 self._ctrl_sock.send(data)
379 except (IOError, socket.error), e:
380 if e[0] == errno.EPIPE:
382 self._ctrl_sock.send(data)
385 encoded = data.rstrip()
386 msg = base64.b64decode(encoded)
390 def recv_from_server(self):
393 while '\n' not in chunk:
395 chunk = self._ctrl_sock.recv(1024)
396 except (OSError, socket.error), e:
397 if e[0] != errno.EINTR:
405 data = ''.join(data).split('\n',1)
408 data, self._rdbuf = data
414 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
415 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
416 self._ctrl_sock.connect(sock_addr)
418 def disconnect(self):
420 self._ctrl_sock.close()
424 class Client(object):
425 def __init__(self, root_dir = ".", host = None, port = None, user = None,
426 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
427 environment_setup = ""):
428 self.root_dir = root_dir
429 self.addr = (host, port)
433 self.communication = communication
434 self.environment_setup = environment_setup
435 self._stopped = False
436 self._deferreds = collections.deque()
440 if self._process.poll() is None:
441 os.kill(self._process.pid, signal.SIGTERM)
445 root_dir = self.root_dir
446 (host, port) = self.addr
450 communication = self.communication
452 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
453 c.forward()" % (root_dir,)
455 self._process = popen_python(python_code,
456 communication = communication,
462 environment_setup = self.environment_setup)
464 # Wait for the forwarder to be ready, otherwise nobody
465 # will be able to connect to it
469 helo = self._process.stderr.readline()
470 if helo == 'FORWARDER_READY.\n':
474 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
476 def send_msg(self, msg):
477 encoded = base64.b64encode(msg)
478 data = "%s\n" % encoded
481 self._process.stdin.write(data)
482 except (IOError, ValueError):
483 # dead process, poll it to un-zombify
486 # try again after reconnect
487 # If it fails again, though, give up
489 self._process.stdin.write(data)
492 self.send_msg(STOP_MSG)
495 def defer_reply(self, transform=None):
497 self._deferreds.append(defer_entry)
499 functools.partial(self.read_reply, defer_entry, transform)
502 def _read_reply(self):
503 data = self._process.stdout.readline()
504 encoded = data.rstrip()
506 # empty == eof == dead process, poll it to un-zombify
509 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
510 return base64.b64decode(encoded)
512 def read_reply(self, which=None, transform=None):
513 # Test to see if someone did it already
514 if which is not None and len(which):
516 # ...just return the deferred value
518 return transform(which[0])
522 # Process all deferreds until the one we're looking for
523 # or until the queue is empty
524 while self._deferreds:
526 deferred = self._deferreds.popleft()
531 deferred.append(self._read_reply())
532 if deferred is which:
533 # We reached the one we were looking for
535 return transform(deferred[0])
540 # They've requested a synchronous read
542 return transform(self._read_reply())
544 return self._read_reply()
546 def _make_server_key_args(server_key, host, port, args):
548 Returns a reference to the created temporary file, and adds the
549 corresponding arguments to the given argument list.
551 Make sure to hold onto it until the process is done with the file
554 host = '%s:%s' % (host,port)
555 # Create a temporary server key file
556 tmp_known_hosts = tempfile.NamedTemporaryFile()
558 # Add the intended host key
559 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
561 # If we're not in strict mode, add user-configured keys
562 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
563 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
564 if os.access(user_hosts_path, os.R_OK):
565 f = open(user_hosts_path, "r")
566 tmp_known_hosts.write(f.read())
569 tmp_known_hosts.flush()
571 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
573 return tmp_known_hosts
575 def make_connkey(user, host, port):
576 connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
577 if len(connkey) > 60:
578 connkey = hashlib.sha1(connkey).hexdigest()
581 def popen_ssh_command(command, host, port, user, agent,
588 err_on_timeout = True,
589 connect_timeout = 30,
592 Executes a remote commands, returns ((stdout,stderr),process)
595 print "ssh", host, command
597 tmp_known_hosts = None
598 connkey = make_connkey(user,host,port)
600 # Don't bother with localhost. Makes test easier
601 '-o', 'NoHostAuthenticationForLocalhost=yes',
602 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
603 '-o', 'ConnectionAttempts=3',
604 '-o', 'ServerAliveInterval=30',
605 '-o', 'TCPKeepAlive=yes',
609 '-o', 'ControlMaster=auto',
610 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
611 '-o', 'ControlPersist=60' ])
615 args.append('-p%d' % port)
617 args.extend(('-i', ident_key))
621 # Create a temporary server key file
622 tmp_known_hosts = _make_server_key_args(
623 server_key, host, port, args)
626 for x in xrange(retry or 3):
627 # connects to the remote host and starts a remote connection
628 proc = subprocess.Popen(args,
629 stdout = subprocess.PIPE,
630 stdin = subprocess.PIPE,
631 stderr = subprocess.PIPE)
633 # attach tempfile object to the process, to make sure the file stays
634 # alive until the process is finished with it
635 proc._known_hosts = tmp_known_hosts
638 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
639 if proc.poll() and err.strip().startswith('ssh: '):
640 # SSH error, can safely retry
643 except RuntimeError,e:
647 print " timedout -> ", e.args
651 print " -> ", out, err
653 return ((out, err), proc)
655 def popen_scp(source, dest,
662 Copies from/to remote sites.
664 Source and destination should have the user and host encoded
667 If source is a file object, a special mode will be used to
668 create the remote file with the same contents.
670 If dest is a file object, the remote file (source) will be
671 read and written into dest.
673 In these modes, recursive cannot be True.
675 Source can be a list of files to copy to a single destination,
676 in which case it is advised that the destination be a folder.
680 print "scp", source, dest
682 if isinstance(source, file) and source.tell() == 0:
684 elif hasattr(source, 'read'):
685 tmp = tempfile.NamedTemporaryFile()
687 buf = source.read(65536)
695 if isinstance(source, file) or isinstance(dest, file) \
696 or hasattr(source, 'read') or hasattr(dest, 'write'):
699 # Parse source/destination as <user>@<server>:<path>
700 if isinstance(dest, basestring) and ':' in dest:
701 remspec, path = dest.split(':',1)
702 elif isinstance(source, basestring) and ':' in source:
703 remspec, path = source.split(':',1)
705 raise ValueError, "Both endpoints cannot be local"
706 user,host = remspec.rsplit('@',1)
707 tmp_known_hosts = None
709 connkey = make_connkey(user,host,port)
710 args = ['ssh', '-l', user, '-C',
711 # Don't bother with localhost. Makes test easier
712 '-o', 'NoHostAuthenticationForLocalhost=yes',
713 '-o', 'ConnectTimeout=30',
714 '-o', 'ConnectionAttempts=3',
715 '-o', 'ServerAliveInterval=30',
716 '-o', 'TCPKeepAlive=yes',
717 '-o', 'ControlMaster=auto',
718 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
719 '-o', 'ControlPersist=60',
722 args.append('-P%d' % port)
724 args.extend(('-i', ident_key))
726 # Create a temporary server key file
727 tmp_known_hosts = _make_server_key_args(
728 server_key, host, port, args)
730 if isinstance(source, file) or hasattr(source, 'read'):
731 args.append('cat > %s' % (shell_escape(path),))
732 elif isinstance(dest, file) or hasattr(dest, 'write'):
733 args.append('cat %s' % (shell_escape(path),))
735 raise AssertionError, "Unreachable code reached! :-Q"
737 # connects to the remote host and starts a remote connection
738 if isinstance(source, file):
739 proc = subprocess.Popen(args,
740 stdout = open('/dev/null','w'),
741 stderr = subprocess.PIPE,
743 err = proc.stderr.read()
744 proc._known_hosts = tmp_known_hosts
745 eintr_retry(proc.wait)()
746 return ((None,err), proc)
747 elif isinstance(dest, file):
748 proc = subprocess.Popen(args,
749 stdout = open('/dev/null','w'),
750 stderr = subprocess.PIPE,
752 err = proc.stderr.read()
753 proc._known_hosts = tmp_known_hosts
754 eintr_retry(proc.wait)()
755 return ((None,err), proc)
756 elif hasattr(source, 'read'):
757 # file-like (but not file) source
758 proc = subprocess.Popen(args,
759 stdout = open('/dev/null','w'),
760 stderr = subprocess.PIPE,
761 stdin = subprocess.PIPE)
767 buf = source.read(4096)
772 rdrdy, wrdy, broken = select.select(
775 [proc.stderr,proc.stdin])
777 if proc.stderr in rdrdy:
778 # use os.read for fully unbuffered behavior
779 err.append(os.read(proc.stderr.fileno(), 4096))
781 if proc.stdin in wrdy:
782 proc.stdin.write(buf)
788 err.append(proc.stderr.read())
790 proc._known_hosts = tmp_known_hosts
791 eintr_retry(proc.wait)()
792 return ((None,''.join(err)), proc)
793 elif hasattr(dest, 'write'):
794 # file-like (but not file) dest
795 proc = subprocess.Popen(args,
796 stdout = subprocess.PIPE,
797 stderr = subprocess.PIPE,
798 stdin = open('/dev/null','w'))
803 rdrdy, wrdy, broken = select.select(
804 [proc.stderr, proc.stdout],
806 [proc.stderr, proc.stdout])
808 if proc.stderr in rdrdy:
809 # use os.read for fully unbuffered behavior
810 err.append(os.read(proc.stderr.fileno(), 4096))
812 if proc.stdout in rdrdy:
813 # use os.read for fully unbuffered behavior
814 buf = os.read(proc.stdout.fileno(), 4096)
823 err.append(proc.stderr.read())
825 proc._known_hosts = tmp_known_hosts
826 eintr_retry(proc.wait)()
827 return ((None,''.join(err)), proc)
829 raise AssertionError, "Unreachable code reached! :-Q"
831 # Parse destination as <user>@<server>:<path>
832 if isinstance(dest, basestring) and ':' in dest:
833 remspec, path = dest.split(':',1)
834 elif isinstance(source, basestring) and ':' in source:
835 remspec, path = source.split(':',1)
837 raise ValueError, "Both endpoints cannot be local"
838 user,host = remspec.rsplit('@',1)
841 tmp_known_hosts = None
842 args = ['scp', '-q', '-p', '-C',
843 # Don't bother with localhost. Makes test easier
844 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
846 args.append('-P%d' % port)
850 args.extend(('-i', ident_key))
852 # Create a temporary server key file
853 tmp_known_hosts = _make_server_key_args(
854 server_key, host, port, args)
855 if isinstance(source,list):
861 # connects to the remote host and starts a remote connection
862 proc = subprocess.Popen(args,
863 stdout = subprocess.PIPE,
864 stdin = subprocess.PIPE,
865 stderr = subprocess.PIPE)
866 proc._known_hosts = tmp_known_hosts
868 comm = proc.communicate()
869 eintr_retry(proc.wait)()
872 def decode_and_execute():
873 # The python code we want to execute might have characters that
874 # are not compatible with the 'inline' mode we are using. To avoid
875 # problems we receive the encoded python code in base64 as a input
876 # stream and decode it for execution.
881 cmd += os.read(0, 1)# one byte from stdin
883 if e.errno == errno.EINTR:
889 cmd = base64.b64decode(cmd)
890 # Uncomment for debug
891 #os.write(2, "Executing python code: %s\n" % cmd)
892 os.write(1, "OK\n") # send a sync message
895 def popen_python(python_code,
896 communication = DC.ACCESS_LOCAL,
906 environment_setup = ""):
910 python_path.replace("'", r"'\''")
911 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
913 if environment_setup:
914 cmd += environment_setup
916 # Uncomment for debug (to run everything under strace)
917 # We had to verify if strace works (cannot nest them)
918 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
920 #cmd += "strace -f -tt -s 200 -o strace$$.out "
922 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
923 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
928 cmd = "sudo bash -c " + shell_escape(cmd)
932 if communication == DC.ACCESS_SSH:
933 tmp_known_hosts = None
935 # Don't bother with localhost. Makes test easier
936 '-o', 'NoHostAuthenticationForLocalhost=yes',
937 '-o', 'ConnectionAttempts=3',
938 '-o', 'ServerAliveInterval=30',
939 '-o', 'TCPKeepAlive=yes',
944 args.append('-p%d' % port)
946 args.extend(('-i', ident_key))
950 # Create a temporary server key file
951 tmp_known_hosts = _make_server_key_args(
952 server_key, host, port, args)
955 args = [ "/bin/bash", "-c", cmd ]
957 # connects to the remote host and starts a remote
958 proc = subprocess.Popen(args,
960 stdout = subprocess.PIPE,
961 stdin = subprocess.PIPE,
962 stderr = subprocess.PIPE)
964 if communication == DC.ACCESS_SSH:
965 proc._known_hosts = tmp_known_hosts
967 # send the command to execute
968 os.write(proc.stdin.fileno(),
969 base64.b64encode(python_code) + "\n")
973 msg = os.read(proc.stdout.fileno(), 3)
976 if e.errno == errno.EINTR:
982 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
983 msg, proc.stdout.read(), proc.stderr.read())
988 def _communicate(self, input, timeout=None, err_on_timeout=True):
991 stdout = None # Return
992 stderr = None # Return
996 if timeout is not None:
997 timelimit = time.time() + timeout
998 killtime = timelimit + 4
999 bailtime = timelimit + 4
1002 # Flush stdio buffer. This might block, if the user has
1003 # been writing to .stdin in an uncontrolled fashion.
1006 write_set.append(self.stdin)
1010 read_set.append(self.stdout)
1013 read_set.append(self.stderr)
1017 while read_set or write_set:
1018 if timeout is not None:
1019 curtime = time.time()
1020 if timeout is None or curtime > timelimit:
1021 if curtime > bailtime:
1023 elif curtime > killtime:
1024 signum = signal.SIGKILL
1026 signum = signal.SIGTERM
1028 os.kill(self.pid, signum)
1029 select_timeout = 0.5
1031 select_timeout = timelimit - curtime + 0.1
1033 select_timeout = 1.0
1035 if select_timeout > 1.0:
1036 select_timeout = 1.0
1039 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1040 except select.error,e:
1046 if not rlist and not wlist and not xlist and self.poll() is not None:
1047 # timeout and process exited, say bye
1050 if self.stdin in wlist:
1051 # When select has indicated that the file is writable,
1052 # we can write up to PIPE_BUF bytes without risk
1053 # blocking. POSIX defines PIPE_BUF >= 512
1054 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1055 input_offset += bytes_written
1056 if input_offset >= len(input):
1058 write_set.remove(self.stdin)
1060 if self.stdout in rlist:
1061 data = os.read(self.stdout.fileno(), 1024)
1064 read_set.remove(self.stdout)
1067 if self.stderr in rlist:
1068 data = os.read(self.stderr.fileno(), 1024)
1071 read_set.remove(self.stderr)
1074 # All data exchanged. Translate lists into strings.
1075 if stdout is not None:
1076 stdout = ''.join(stdout)
1077 if stderr is not None:
1078 stderr = ''.join(stderr)
1080 # Translate newlines, if requested. We cannot let the file
1081 # object do the translation: It is based on stdio, which is
1082 # impossible to combine with select (unless forcing no
1084 if self.universal_newlines and hasattr(file, 'newlines'):
1086 stdout = self._translate_newlines(stdout)
1088 stderr = self._translate_newlines(stderr)
1090 if killed and err_on_timeout:
1091 errcode = self.poll()
1092 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1098 return (stdout, stderr)