2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
26 CTRL_SOCK = "ctrl.sock"
28 STD_ERR = "stderr.log"
33 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
35 if hasattr(os, "devnull"):
38 DEV_NULL = "/dev/null"
40 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
43 """ Escapes strings so that they are safe to use as command-line arguments """
44 if SHELL_SAFE.match(s):
45 # safe string - no escaping needed
48 # unsafe string - escape
50 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
53 return "'$'\\x%02x''" % (ord(c),)
54 s = ''.join(map(escp,s))
57 def eintr_retry(func):
59 @functools.wraps(func)
61 retry = kw.pop("_retry", False)
62 for i in xrange(0 if retry else 4):
65 except (select.error, socket.error), args:
66 if args[0] == errno.EINTR:
71 if e.errno == errno.EINTR:
80 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
81 environment_setup = "", clean_root = False):
82 self._root_dir = root_dir
83 self._clean_root = clean_root
85 self._ctrl_sock = None
86 self._log_level = log_level
88 self._environment_setup = environment_setup
97 # can not return normally after fork beacuse no exec was done.
98 # This means that if we don't do a os._exit(0) here the code that
99 # follows the call to "Server.run()" in the "caller code" will be
100 # executed... but by now it has already been executed after the
101 # first process (the one that did the first fork) returned.
104 print >>sys.stderr, "SERVER_ERROR."
108 print >>sys.stderr, "SERVER_READY."
111 # pipes for process synchronization
115 root = os.path.normpath(self._root_dir)
116 if self._root_dir not in [".", ""] and os.path.exists(root) \
117 and self._clean_root:
119 if not os.path.exists(root):
120 os.makedirs(root, 0755)
128 except OSError, e: # pragma: no cover
129 if e.errno == errno.EINTR:
135 # os.waitpid avoids leaving a <defunc> (zombie) process
136 st = os.waitpid(pid1, 0)[1]
138 raise RuntimeError("Daemonization failed")
139 # return 0 to inform the caller method that this is not the
144 # Decouple from parent environment.
145 os.chdir(self._root_dir)
152 # see ref: "os._exit(0)"
155 # close all open file descriptors.
156 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
157 if (max_fd == resource.RLIM_INFINITY):
159 for fd in range(3, max_fd):
166 # Redirect standard file descriptors.
167 stdin = open(DEV_NULL, "r")
168 stderr = stdout = open(STD_ERR, "a", 0)
169 os.dup2(stdin.fileno(), sys.stdin.fileno())
170 # NOTE: sys.stdout.write will still be buffered, even if the file
171 # was opened with 0 buffer
172 os.dup2(stdout.fileno(), sys.stdout.fileno())
173 os.dup2(stderr.fileno(), sys.stderr.fileno())
176 if self._environment_setup:
177 # parse environment variables and pass to child process
178 # do it by executing shell commands, in case there's some heavy setup involved
179 envproc = subprocess.Popen(
181 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
182 ( self._environment_setup, ) ],
183 stdin = subprocess.PIPE,
184 stdout = subprocess.PIPE,
185 stderr = subprocess.PIPE
187 out,err = envproc.communicate()
189 # parse new environment
191 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
193 # apply to current environment
194 for name, value in environment.iteritems():
195 os.environ[name] = value
198 if 'PYTHONPATH' in environment:
199 sys.path = environment['PYTHONPATH'].split(':') + sys.path
201 # create control socket
202 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
204 self._ctrl_sock.bind(CTRL_SOCK)
206 # Address in use, check pidfile
209 pidfile = open(CTRL_PID, "r")
218 # Check process liveliness
219 if not os.path.exists("/proc/%d" % (pid,)):
220 # Ok, it's dead, clean the socket
224 self._ctrl_sock.bind(CTRL_SOCK)
226 self._ctrl_sock.listen(0)
229 pidfile = open(CTRL_PID, "w")
230 pidfile.write(str(os.getpid()))
233 # let the parent process know that the daemonization is finished
238 def post_daemonize(self):
239 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
240 # QT, for some strange reason, redefines the SIGCHILD handler to write
241 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
242 # Server dameonization closes all file descriptors from fileno '3',
243 # but the overloaded handler (inherited by the forked process) will
244 # keep trying to write the \0 to fileno 'x', which might have been reused
245 # after closing, for other operations. This is bad bad bad when fileno 'x'
246 # is in use for communication pouroses, because unexpected \0 start
247 # appearing in the communication messages... this is exactly what happens
248 # when using netns in daemonized form. Thus, be have no other alternative than
249 # restoring the SIGCHLD handler to the default here.
251 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
254 while not self._stop:
255 conn, addr = self._ctrl_sock.accept()
256 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
258 while not self._stop:
260 msg = self.recv_msg(conn)
261 except socket.timeout, e:
262 #self.log_error("SERVER recv_msg: connection timedout ")
266 self.log_error("CONNECTION LOST")
271 reply = self.stop_action()
273 reply = self.reply_action(msg)
276 self.send_reply(conn, reply)
279 self.log_error("NOTICE: Awaiting for reconnection")
287 def recv_msg(self, conn):
290 while '\n' not in chunk:
292 chunk = conn.recv(1024)
293 except (OSError, socket.error), e:
294 if e[0] != errno.EINTR:
303 data = ''.join(data).split('\n',1)
306 data, self._rdbuf = data
308 decoded = base64.b64decode(data)
309 return decoded.rstrip()
311 def send_reply(self, conn, reply):
312 encoded = base64.b64encode(reply)
313 conn.send("%s\n" % encoded)
317 self._ctrl_sock.close()
322 def stop_action(self):
323 return "Stopping server"
325 def reply_action(self, msg):
326 return "Reply to: %s" % msg
328 def log_error(self, text = None, context = ''):
330 text = traceback.format_exc()
331 date = time.strftime("%Y-%m-%d %H:%M:%S")
333 context = " (%s)" % (context,)
334 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
337 def log_debug(self, text):
338 if self._log_level == DC.DEBUG_LEVEL:
339 date = time.strftime("%Y-%m-%d %H:%M:%S")
340 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
342 class Forwarder(object):
343 def __init__(self, root_dir = "."):
344 self._ctrl_sock = None
345 self._root_dir = root_dir
351 print >>sys.stderr, "FORWARDER_READY."
352 while not self._stop:
353 data = self.read_data()
355 # Connection to client lost
357 self.send_to_server(data)
359 data = self.recv_from_server()
361 # Connection to server lost
362 raise IOError, "Connection to server lost while "\
364 self.write_data(data)
368 return sys.stdin.readline()
370 def write_data(self, data):
371 sys.stdout.write(data)
372 # sys.stdout.write is buffered, this is why we need to do a flush()
375 def send_to_server(self, data):
377 self._ctrl_sock.send(data)
378 except (IOError, socket.error), e:
379 if e[0] == errno.EPIPE:
381 self._ctrl_sock.send(data)
384 encoded = data.rstrip()
385 msg = base64.b64decode(encoded)
389 def recv_from_server(self):
392 while '\n' not in chunk:
394 chunk = self._ctrl_sock.recv(1024)
395 except (OSError, socket.error), e:
396 if e[0] != errno.EINTR:
404 data = ''.join(data).split('\n',1)
407 data, self._rdbuf = data
413 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
414 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
415 self._ctrl_sock.connect(sock_addr)
417 def disconnect(self):
419 self._ctrl_sock.close()
423 class Client(object):
424 def __init__(self, root_dir = ".", host = None, port = None, user = None,
425 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
426 environment_setup = ""):
427 self.root_dir = root_dir
428 self.addr = (host, port)
432 self.communication = communication
433 self.environment_setup = environment_setup
434 self._stopped = False
435 self._deferreds = collections.deque()
439 if self._process.poll() is None:
440 os.kill(self._process.pid, signal.SIGTERM)
444 root_dir = self.root_dir
445 (host, port) = self.addr
449 communication = self.communication
451 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
452 c.forward()" % (root_dir,)
454 self._process = popen_python(python_code,
455 communication = communication,
461 environment_setup = self.environment_setup)
463 # Wait for the forwarder to be ready, otherwise nobody
464 # will be able to connect to it
468 helo = self._process.stderr.readline()
469 if helo == 'FORWARDER_READY.\n':
473 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
475 def send_msg(self, msg):
476 encoded = base64.b64encode(msg)
477 data = "%s\n" % encoded
480 self._process.stdin.write(data)
481 except (IOError, ValueError):
482 # dead process, poll it to un-zombify
485 # try again after reconnect
486 # If it fails again, though, give up
488 self._process.stdin.write(data)
491 self.send_msg(STOP_MSG)
494 def defer_reply(self, transform=None):
496 self._deferreds.append(defer_entry)
498 functools.partial(self.read_reply, defer_entry, transform)
501 def _read_reply(self):
502 data = self._process.stdout.readline()
503 encoded = data.rstrip()
505 # empty == eof == dead process, poll it to un-zombify
508 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
509 return base64.b64decode(encoded)
511 def read_reply(self, which=None, transform=None):
512 # Test to see if someone did it already
513 if which is not None and len(which):
515 # ...just return the deferred value
517 return transform(which[0])
521 # Process all deferreds until the one we're looking for
522 # or until the queue is empty
523 while self._deferreds:
525 deferred = self._deferreds.popleft()
530 deferred.append(self._read_reply())
531 if deferred is which:
532 # We reached the one we were looking for
534 return transform(deferred[0])
539 # They've requested a synchronous read
541 return transform(self._read_reply())
543 return self._read_reply()
545 def _make_server_key_args(server_key, host, port, args):
547 Returns a reference to the created temporary file, and adds the
548 corresponding arguments to the given argument list.
550 Make sure to hold onto it until the process is done with the file
553 host = '%s:%s' % (host,port)
554 # Create a temporary server key file
555 tmp_known_hosts = tempfile.NamedTemporaryFile()
557 # Add the intended host key
558 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
560 # If we're not in strict mode, add user-configured keys
561 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
562 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
563 if os.access(user_hosts_path, os.R_OK):
564 f = open(user_hosts_path, "r")
565 tmp_known_hosts.write(f.read())
568 tmp_known_hosts.flush()
570 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
572 return tmp_known_hosts
574 def popen_ssh_command(command, host, port, user, agent,
581 err_on_timeout = True,
582 connect_timeout = 30):
584 Executes a remote commands, returns ((stdout,stderr),process)
587 print "ssh", host, command
589 tmp_known_hosts = None
590 connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
592 # Don't bother with localhost. Makes test easier
593 '-o', 'NoHostAuthenticationForLocalhost=yes',
594 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
595 '-o', 'ConnectionAttempts=3',
596 '-o', 'ServerAliveInterval=30',
597 '-o', 'TCPKeepAlive=yes',
598 '-o', 'ControlMaster=auto',
599 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
600 '-o', 'ControlPersist=60',
605 args.append('-p%d' % port)
607 args.extend(('-i', ident_key))
611 # Create a temporary server key file
612 tmp_known_hosts = _make_server_key_args(
613 server_key, host, port, args)
616 for x in xrange(retry or 3):
617 # connects to the remote host and starts a remote connection
618 proc = subprocess.Popen(args,
619 stdout = subprocess.PIPE,
620 stdin = subprocess.PIPE,
621 stderr = subprocess.PIPE)
623 # attach tempfile object to the process, to make sure the file stays
624 # alive until the process is finished with it
625 proc._known_hosts = tmp_known_hosts
628 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
629 if proc.poll() and err.strip().startswith('ssh: '):
630 # SSH error, can safely retry
633 except RuntimeError,e:
637 print " timedout -> ", e.args
641 print " -> ", out, err
643 return ((out, err), proc)
645 def popen_scp(source, dest,
652 Copies from/to remote sites.
654 Source and destination should have the user and host encoded
657 If source is a file object, a special mode will be used to
658 create the remote file with the same contents.
660 If dest is a file object, the remote file (source) will be
661 read and written into dest.
663 In these modes, recursive cannot be True.
665 Source can be a list of files to copy to a single destination,
666 in which case it is advised that the destination be a folder.
670 print "scp", source, dest
672 if isinstance(source, file) and source.tell() == 0:
674 elif hasattr(source, 'read'):
675 tmp = tempfile.NamedTemporaryFile()
677 buf = source.read(65536)
685 if isinstance(source, file) or isinstance(dest, file) \
686 or hasattr(source, 'read') or hasattr(dest, 'write'):
689 # Parse source/destination as <user>@<server>:<path>
690 if isinstance(dest, basestring) and ':' in dest:
691 remspec, path = dest.split(':',1)
692 elif isinstance(source, basestring) and ':' in source:
693 remspec, path = source.split(':',1)
695 raise ValueError, "Both endpoints cannot be local"
696 user,host = remspec.rsplit('@',1)
697 tmp_known_hosts = None
699 connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
700 args = ['ssh', '-l', user, '-C',
701 # Don't bother with localhost. Makes test easier
702 '-o', 'NoHostAuthenticationForLocalhost=yes',
703 '-o', 'ConnectTimeout=30',
704 '-o', 'ConnectionAttempts=3',
705 '-o', 'ServerAliveInterval=30',
706 '-o', 'TCPKeepAlive=yes',
707 '-o', 'ControlMaster=auto',
708 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
709 '-o', 'ControlPersist=60',
712 args.append('-P%d' % port)
714 args.extend(('-i', ident_key))
716 # Create a temporary server key file
717 tmp_known_hosts = _make_server_key_args(
718 server_key, host, port, args)
720 if isinstance(source, file) or hasattr(source, 'read'):
721 args.append('cat > %s' % (shell_escape(path),))
722 elif isinstance(dest, file) or hasattr(dest, 'write'):
723 args.append('cat %s' % (shell_escape(path),))
725 raise AssertionError, "Unreachable code reached! :-Q"
727 # connects to the remote host and starts a remote connection
728 if isinstance(source, file):
729 proc = subprocess.Popen(args,
730 stdout = open('/dev/null','w'),
731 stderr = subprocess.PIPE,
733 err = proc.stderr.read()
734 proc._known_hosts = tmp_known_hosts
735 eintr_retry(proc.wait)()
736 return ((None,err), proc)
737 elif isinstance(dest, file):
738 proc = subprocess.Popen(args,
739 stdout = open('/dev/null','w'),
740 stderr = subprocess.PIPE,
742 err = proc.stderr.read()
743 proc._known_hosts = tmp_known_hosts
744 eintr_retry(proc.wait)()
745 return ((None,err), proc)
746 elif hasattr(source, 'read'):
747 # file-like (but not file) source
748 proc = subprocess.Popen(args,
749 stdout = open('/dev/null','w'),
750 stderr = subprocess.PIPE,
751 stdin = subprocess.PIPE)
757 buf = source.read(4096)
762 rdrdy, wrdy, broken = select.select(
765 [proc.stderr,proc.stdin])
767 if proc.stderr in rdrdy:
768 # use os.read for fully unbuffered behavior
769 err.append(os.read(proc.stderr.fileno(), 4096))
771 if proc.stdin in wrdy:
772 proc.stdin.write(buf)
778 err.append(proc.stderr.read())
780 proc._known_hosts = tmp_known_hosts
781 eintr_retry(proc.wait)()
782 return ((None,''.join(err)), proc)
783 elif hasattr(dest, 'write'):
784 # file-like (but not file) dest
785 proc = subprocess.Popen(args,
786 stdout = subprocess.PIPE,
787 stderr = subprocess.PIPE,
788 stdin = open('/dev/null','w'))
793 rdrdy, wrdy, broken = select.select(
794 [proc.stderr, proc.stdout],
796 [proc.stderr, proc.stdout])
798 if proc.stderr in rdrdy:
799 # use os.read for fully unbuffered behavior
800 err.append(os.read(proc.stderr.fileno(), 4096))
802 if proc.stdout in rdrdy:
803 # use os.read for fully unbuffered behavior
804 buf = os.read(proc.stdout.fileno(), 4096)
813 err.append(proc.stderr.read())
815 proc._known_hosts = tmp_known_hosts
816 eintr_retry(proc.wait)()
817 return ((None,''.join(err)), proc)
819 raise AssertionError, "Unreachable code reached! :-Q"
821 # Parse destination as <user>@<server>:<path>
822 if isinstance(dest, basestring) and ':' in dest:
823 remspec, path = dest.split(':',1)
824 elif isinstance(source, basestring) and ':' in source:
825 remspec, path = source.split(':',1)
827 raise ValueError, "Both endpoints cannot be local"
828 user,host = remspec.rsplit('@',1)
831 tmp_known_hosts = None
832 args = ['scp', '-q', '-p', '-C',
833 # Don't bother with localhost. Makes test easier
834 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
836 args.append('-P%d' % port)
840 args.extend(('-i', ident_key))
842 # Create a temporary server key file
843 tmp_known_hosts = _make_server_key_args(
844 server_key, host, port, args)
845 if isinstance(source,list):
851 # connects to the remote host and starts a remote connection
852 proc = subprocess.Popen(args,
853 stdout = subprocess.PIPE,
854 stdin = subprocess.PIPE,
855 stderr = subprocess.PIPE)
856 proc._known_hosts = tmp_known_hosts
858 comm = proc.communicate()
859 eintr_retry(proc.wait)()
862 def decode_and_execute():
863 # The python code we want to execute might have characters that
864 # are not compatible with the 'inline' mode we are using. To avoid
865 # problems we receive the encoded python code in base64 as a input
866 # stream and decode it for execution.
871 cmd += os.read(0, 1)# one byte from stdin
873 if e.errno == errno.EINTR:
879 cmd = base64.b64decode(cmd)
880 # Uncomment for debug
881 #os.write(2, "Executing python code: %s\n" % cmd)
882 os.write(1, "OK\n") # send a sync message
885 def popen_python(python_code,
886 communication = DC.ACCESS_LOCAL,
896 environment_setup = ""):
900 python_path.replace("'", r"'\''")
901 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
903 if environment_setup:
904 cmd += environment_setup
906 # Uncomment for debug (to run everything under strace)
907 # We had to verify if strace works (cannot nest them)
908 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
910 #cmd += "strace -f -tt -s 200 -o strace$$.out "
912 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
913 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
918 cmd = "sudo bash -c " + shell_escape(cmd)
922 if communication == DC.ACCESS_SSH:
923 tmp_known_hosts = None
925 # Don't bother with localhost. Makes test easier
926 '-o', 'NoHostAuthenticationForLocalhost=yes',
927 '-o', 'ConnectionAttempts=3',
928 '-o', 'ServerAliveInterval=30',
929 '-o', 'TCPKeepAlive=yes',
934 args.append('-p%d' % port)
936 args.extend(('-i', ident_key))
940 # Create a temporary server key file
941 tmp_known_hosts = _make_server_key_args(
942 server_key, host, port, args)
945 args = [ "/bin/bash", "-c", cmd ]
947 # connects to the remote host and starts a remote
948 proc = subprocess.Popen(args,
950 stdout = subprocess.PIPE,
951 stdin = subprocess.PIPE,
952 stderr = subprocess.PIPE)
954 if communication == DC.ACCESS_SSH:
955 proc._known_hosts = tmp_known_hosts
957 # send the command to execute
958 os.write(proc.stdin.fileno(),
959 base64.b64encode(python_code) + "\n")
963 msg = os.read(proc.stdout.fileno(), 3)
966 if e.errno == errno.EINTR:
972 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
973 msg, proc.stdout.read(), proc.stderr.read())
978 def _communicate(self, input, timeout=None, err_on_timeout=True):
981 stdout = None # Return
982 stderr = None # Return
986 if timeout is not None:
987 timelimit = time.time() + timeout
988 killtime = timelimit + 4
989 bailtime = timelimit + 4
992 # Flush stdio buffer. This might block, if the user has
993 # been writing to .stdin in an uncontrolled fashion.
996 write_set.append(self.stdin)
1000 read_set.append(self.stdout)
1003 read_set.append(self.stderr)
1007 while read_set or write_set:
1008 if timeout is not None:
1009 curtime = time.time()
1010 if timeout is None or curtime > timelimit:
1011 if curtime > bailtime:
1013 elif curtime > killtime:
1014 signum = signal.SIGKILL
1016 signum = signal.SIGTERM
1018 os.kill(self.pid, signum)
1019 select_timeout = 0.5
1021 select_timeout = timelimit - curtime + 0.1
1023 select_timeout = 1.0
1025 if select_timeout > 1.0:
1026 select_timeout = 1.0
1029 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1030 except select.error,e:
1036 if not rlist and not wlist and not xlist and self.poll() is not None:
1037 # timeout and process exited, say bye
1040 if self.stdin in wlist:
1041 # When select has indicated that the file is writable,
1042 # we can write up to PIPE_BUF bytes without risk
1043 # blocking. POSIX defines PIPE_BUF >= 512
1044 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1045 input_offset += bytes_written
1046 if input_offset >= len(input):
1048 write_set.remove(self.stdin)
1050 if self.stdout in rlist:
1051 data = os.read(self.stdout.fileno(), 1024)
1054 read_set.remove(self.stdout)
1057 if self.stderr in rlist:
1058 data = os.read(self.stderr.fileno(), 1024)
1061 read_set.remove(self.stderr)
1064 # All data exchanged. Translate lists into strings.
1065 if stdout is not None:
1066 stdout = ''.join(stdout)
1067 if stderr is not None:
1068 stderr = ''.join(stderr)
1070 # Translate newlines, if requested. We cannot let the file
1071 # object do the translation: It is based on stdio, which is
1072 # impossible to combine with select (unless forcing no
1074 if self.universal_newlines and hasattr(file, 'newlines'):
1076 stdout = self._translate_newlines(stdout)
1078 stderr = self._translate_newlines(stderr)
1080 if killed and err_on_timeout:
1081 errcode = self.poll()
1082 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1088 return (stdout, stderr)