2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
27 CTRL_SOCK = "ctrl.sock"
29 STD_ERR = "stderr.log"
34 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
36 OPENSSH_HAS_PERSIST = None
38 if hasattr(os, "devnull"):
41 DEV_NULL = "/dev/null"
43 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
45 def openssh_has_persist():
46 global OPENSSH_HAS_PERSIST
47 if OPENSSH_HAS_PERSIST is None:
48 proc = subprocess.Popen(["ssh","-v"],
49 stdout = subprocess.PIPE,
50 stderr = subprocess.STDOUT,
51 stdin = open("/dev/null","r") )
52 out,err = proc.communicate()
55 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
56 OPENSSH_HAS_PERSIST = bool(vre.match(out))
57 return OPENSSH_HAS_PERSIST
60 """ Escapes strings so that they are safe to use as command-line arguments """
61 if SHELL_SAFE.match(s):
62 # safe string - no escaping needed
65 # unsafe string - escape
67 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
70 return "'$'\\x%02x''" % (ord(c),)
71 s = ''.join(map(escp,s))
74 def eintr_retry(func):
76 @functools.wraps(func)
78 retry = kw.pop("_retry", False)
79 for i in xrange(0 if retry else 4):
82 except (select.error, socket.error), args:
83 if args[0] == errno.EINTR:
88 if e.errno == errno.EINTR:
97 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
98 environment_setup = "", clean_root = False):
99 self._root_dir = root_dir
100 self._clean_root = clean_root
102 self._ctrl_sock = None
103 self._log_level = log_level
105 self._environment_setup = environment_setup
110 self.post_daemonize()
114 # can not return normally after fork beacuse no exec was done.
115 # This means that if we don't do a os._exit(0) here the code that
116 # follows the call to "Server.run()" in the "caller code" will be
117 # executed... but by now it has already been executed after the
118 # first process (the one that did the first fork) returned.
121 print >>sys.stderr, "SERVER_ERROR."
125 print >>sys.stderr, "SERVER_READY."
128 # pipes for process synchronization
132 root = os.path.normpath(self._root_dir)
133 if self._root_dir not in [".", ""] and os.path.exists(root) \
134 and self._clean_root:
136 if not os.path.exists(root):
137 os.makedirs(root, 0755)
145 except OSError, e: # pragma: no cover
146 if e.errno == errno.EINTR:
152 # os.waitpid avoids leaving a <defunc> (zombie) process
153 st = os.waitpid(pid1, 0)[1]
155 raise RuntimeError("Daemonization failed")
156 # return 0 to inform the caller method that this is not the
161 # Decouple from parent environment.
162 os.chdir(self._root_dir)
169 # see ref: "os._exit(0)"
172 # close all open file descriptors.
173 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
174 if (max_fd == resource.RLIM_INFINITY):
176 for fd in range(3, max_fd):
183 # Redirect standard file descriptors.
184 stdin = open(DEV_NULL, "r")
185 stderr = stdout = open(STD_ERR, "a", 0)
186 os.dup2(stdin.fileno(), sys.stdin.fileno())
187 # NOTE: sys.stdout.write will still be buffered, even if the file
188 # was opened with 0 buffer
189 os.dup2(stdout.fileno(), sys.stdout.fileno())
190 os.dup2(stderr.fileno(), sys.stderr.fileno())
193 if self._environment_setup:
194 # parse environment variables and pass to child process
195 # do it by executing shell commands, in case there's some heavy setup involved
196 envproc = subprocess.Popen(
198 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
199 ( self._environment_setup, ) ],
200 stdin = subprocess.PIPE,
201 stdout = subprocess.PIPE,
202 stderr = subprocess.PIPE
204 out,err = envproc.communicate()
206 # parse new environment
208 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
210 # apply to current environment
211 for name, value in environment.iteritems():
212 os.environ[name] = value
215 if 'PYTHONPATH' in environment:
216 sys.path = environment['PYTHONPATH'].split(':') + sys.path
218 # create control socket
219 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
221 self._ctrl_sock.bind(CTRL_SOCK)
223 # Address in use, check pidfile
226 pidfile = open(CTRL_PID, "r")
235 # Check process liveliness
236 if not os.path.exists("/proc/%d" % (pid,)):
237 # Ok, it's dead, clean the socket
241 self._ctrl_sock.bind(CTRL_SOCK)
243 self._ctrl_sock.listen(0)
246 pidfile = open(CTRL_PID, "w")
247 pidfile.write(str(os.getpid()))
250 # let the parent process know that the daemonization is finished
255 def post_daemonize(self):
256 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
257 # QT, for some strange reason, redefines the SIGCHILD handler to write
258 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
259 # Server dameonization closes all file descriptors from fileno '3',
260 # but the overloaded handler (inherited by the forked process) will
261 # keep trying to write the \0 to fileno 'x', which might have been reused
262 # after closing, for other operations. This is bad bad bad when fileno 'x'
263 # is in use for communication pouroses, because unexpected \0 start
264 # appearing in the communication messages... this is exactly what happens
265 # when using netns in daemonized form. Thus, be have no other alternative than
266 # restoring the SIGCHLD handler to the default here.
268 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
271 while not self._stop:
272 conn, addr = self._ctrl_sock.accept()
273 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
275 while not self._stop:
277 msg = self.recv_msg(conn)
278 except socket.timeout, e:
279 #self.log_error("SERVER recv_msg: connection timedout ")
283 self.log_error("CONNECTION LOST")
288 reply = self.stop_action()
290 reply = self.reply_action(msg)
293 self.send_reply(conn, reply)
296 self.log_error("NOTICE: Awaiting for reconnection")
304 def recv_msg(self, conn):
307 while '\n' not in chunk:
309 chunk = conn.recv(1024)
310 except (OSError, socket.error), e:
311 if e[0] != errno.EINTR:
320 data = ''.join(data).split('\n',1)
323 data, self._rdbuf = data
325 decoded = base64.b64decode(data)
326 return decoded.rstrip()
328 def send_reply(self, conn, reply):
329 encoded = base64.b64encode(reply)
330 conn.send("%s\n" % encoded)
334 self._ctrl_sock.close()
339 def stop_action(self):
340 return "Stopping server"
342 def reply_action(self, msg):
343 return "Reply to: %s" % msg
345 def log_error(self, text = None, context = ''):
347 text = traceback.format_exc()
348 date = time.strftime("%Y-%m-%d %H:%M:%S")
350 context = " (%s)" % (context,)
351 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
354 def log_debug(self, text):
355 if self._log_level == DC.DEBUG_LEVEL:
356 date = time.strftime("%Y-%m-%d %H:%M:%S")
357 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
359 class Forwarder(object):
360 def __init__(self, root_dir = "."):
361 self._ctrl_sock = None
362 self._root_dir = root_dir
368 print >>sys.stderr, "FORWARDER_READY."
369 while not self._stop:
370 data = self.read_data()
372 # Connection to client lost
374 self.send_to_server(data)
376 data = self.recv_from_server()
378 # Connection to server lost
379 raise IOError, "Connection to server lost while "\
381 self.write_data(data)
385 return sys.stdin.readline()
387 def write_data(self, data):
388 sys.stdout.write(data)
389 # sys.stdout.write is buffered, this is why we need to do a flush()
392 def send_to_server(self, data):
394 self._ctrl_sock.send(data)
395 except (IOError, socket.error), e:
396 if e[0] == errno.EPIPE:
398 self._ctrl_sock.send(data)
401 encoded = data.rstrip()
402 msg = base64.b64decode(encoded)
406 def recv_from_server(self):
409 while '\n' not in chunk:
411 chunk = self._ctrl_sock.recv(1024)
412 except (OSError, socket.error), e:
413 if e[0] != errno.EINTR:
421 data = ''.join(data).split('\n',1)
424 data, self._rdbuf = data
430 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
431 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
432 self._ctrl_sock.connect(sock_addr)
434 def disconnect(self):
436 self._ctrl_sock.close()
440 class Client(object):
441 def __init__(self, root_dir = ".", host = None, port = None, user = None,
442 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
443 environment_setup = ""):
444 self.root_dir = root_dir
445 self.addr = (host, port)
449 self.communication = communication
450 self.environment_setup = environment_setup
451 self._stopped = False
452 self._deferreds = collections.deque()
456 if self._process.poll() is None:
457 os.kill(self._process.pid, signal.SIGTERM)
461 root_dir = self.root_dir
462 (host, port) = self.addr
466 communication = self.communication
468 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
469 c.forward()" % (root_dir,)
471 self._process = popen_python(python_code,
472 communication = communication,
478 environment_setup = self.environment_setup)
480 # Wait for the forwarder to be ready, otherwise nobody
481 # will be able to connect to it
485 helo = self._process.stderr.readline()
486 if helo == 'FORWARDER_READY.\n':
490 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
492 def send_msg(self, msg):
493 encoded = base64.b64encode(msg)
494 data = "%s\n" % encoded
497 self._process.stdin.write(data)
498 except (IOError, ValueError):
499 # dead process, poll it to un-zombify
502 # try again after reconnect
503 # If it fails again, though, give up
505 self._process.stdin.write(data)
508 self.send_msg(STOP_MSG)
511 def defer_reply(self, transform=None):
513 self._deferreds.append(defer_entry)
515 functools.partial(self.read_reply, defer_entry, transform)
518 def _read_reply(self):
519 data = self._process.stdout.readline()
520 encoded = data.rstrip()
522 # empty == eof == dead process, poll it to un-zombify
525 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
526 return base64.b64decode(encoded)
528 def read_reply(self, which=None, transform=None):
529 # Test to see if someone did it already
530 if which is not None and len(which):
532 # ...just return the deferred value
534 return transform(which[0])
538 # Process all deferreds until the one we're looking for
539 # or until the queue is empty
540 while self._deferreds:
542 deferred = self._deferreds.popleft()
547 deferred.append(self._read_reply())
548 if deferred is which:
549 # We reached the one we were looking for
551 return transform(deferred[0])
556 # They've requested a synchronous read
558 return transform(self._read_reply())
560 return self._read_reply()
562 def _make_server_key_args(server_key, host, port, args):
564 Returns a reference to the created temporary file, and adds the
565 corresponding arguments to the given argument list.
567 Make sure to hold onto it until the process is done with the file
570 host = '%s:%s' % (host,port)
571 # Create a temporary server key file
572 tmp_known_hosts = tempfile.NamedTemporaryFile()
574 # Add the intended host key
575 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
577 # If we're not in strict mode, add user-configured keys
578 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
579 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
580 if os.access(user_hosts_path, os.R_OK):
581 f = open(user_hosts_path, "r")
582 tmp_known_hosts.write(f.read())
585 tmp_known_hosts.flush()
587 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
589 return tmp_known_hosts
591 def make_connkey(user, host, port):
592 connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
593 if len(connkey) > 60:
594 connkey = hashlib.sha1(connkey).hexdigest()
597 def popen_ssh_command(command, host, port, user, agent,
604 err_on_timeout = True,
605 connect_timeout = 30,
608 Executes a remote commands, returns ((stdout,stderr),process)
611 print "ssh", host, command
613 tmp_known_hosts = None
614 connkey = make_connkey(user,host,port)
616 # Don't bother with localhost. Makes test easier
617 '-o', 'NoHostAuthenticationForLocalhost=yes',
618 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
619 '-o', 'ConnectionAttempts=3',
620 '-o', 'ServerAliveInterval=30',
621 '-o', 'TCPKeepAlive=yes',
623 if persistent and openssh_has_persist():
625 '-o', 'ControlMaster=auto',
626 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
627 '-o', 'ControlPersist=60' ])
631 args.append('-p%d' % port)
633 args.extend(('-i', ident_key))
637 # Create a temporary server key file
638 tmp_known_hosts = _make_server_key_args(
639 server_key, host, port, args)
642 for x in xrange(retry or 3):
643 # connects to the remote host and starts a remote connection
644 proc = subprocess.Popen(args,
645 stdout = subprocess.PIPE,
646 stdin = subprocess.PIPE,
647 stderr = subprocess.PIPE)
649 # attach tempfile object to the process, to make sure the file stays
650 # alive until the process is finished with it
651 proc._known_hosts = tmp_known_hosts
654 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
656 if err.strip().startswith('ssh: '):
657 # SSH error, can safely retry
660 # Probably timed out or plain failed but can retry
663 except RuntimeError,e:
667 print " timedout -> ", e.args
671 print " -> ", out, err
673 return ((out, err), proc)
675 def popen_scp(source, dest,
682 Copies from/to remote sites.
684 Source and destination should have the user and host encoded
687 If source is a file object, a special mode will be used to
688 create the remote file with the same contents.
690 If dest is a file object, the remote file (source) will be
691 read and written into dest.
693 In these modes, recursive cannot be True.
695 Source can be a list of files to copy to a single destination,
696 in which case it is advised that the destination be a folder.
700 print "scp", source, dest
702 if isinstance(source, file) and source.tell() == 0:
704 elif hasattr(source, 'read'):
705 tmp = tempfile.NamedTemporaryFile()
707 buf = source.read(65536)
715 if isinstance(source, file) or isinstance(dest, file) \
716 or hasattr(source, 'read') or hasattr(dest, 'write'):
719 # Parse source/destination as <user>@<server>:<path>
720 if isinstance(dest, basestring) and ':' in dest:
721 remspec, path = dest.split(':',1)
722 elif isinstance(source, basestring) and ':' in source:
723 remspec, path = source.split(':',1)
725 raise ValueError, "Both endpoints cannot be local"
726 user,host = remspec.rsplit('@',1)
727 tmp_known_hosts = None
729 connkey = make_connkey(user,host,port)
730 args = ['ssh', '-l', user, '-C',
731 # Don't bother with localhost. Makes test easier
732 '-o', 'NoHostAuthenticationForLocalhost=yes',
733 '-o', 'ConnectTimeout=30',
734 '-o', 'ConnectionAttempts=3',
735 '-o', 'ServerAliveInterval=30',
736 '-o', 'TCPKeepAlive=yes',
738 if openssh_has_persist():
740 '-o', 'ControlMaster=auto',
741 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
742 '-o', 'ControlPersist=60' ])
744 args.append('-P%d' % port)
746 args.extend(('-i', ident_key))
748 # Create a temporary server key file
749 tmp_known_hosts = _make_server_key_args(
750 server_key, host, port, args)
752 if isinstance(source, file) or hasattr(source, 'read'):
753 args.append('cat > %s' % (shell_escape(path),))
754 elif isinstance(dest, file) or hasattr(dest, 'write'):
755 args.append('cat %s' % (shell_escape(path),))
757 raise AssertionError, "Unreachable code reached! :-Q"
759 # connects to the remote host and starts a remote connection
760 if isinstance(source, file):
761 proc = subprocess.Popen(args,
762 stdout = open('/dev/null','w'),
763 stderr = subprocess.PIPE,
765 err = proc.stderr.read()
766 proc._known_hosts = tmp_known_hosts
767 eintr_retry(proc.wait)()
768 return ((None,err), proc)
769 elif isinstance(dest, file):
770 proc = subprocess.Popen(args,
771 stdout = open('/dev/null','w'),
772 stderr = subprocess.PIPE,
774 err = proc.stderr.read()
775 proc._known_hosts = tmp_known_hosts
776 eintr_retry(proc.wait)()
777 return ((None,err), proc)
778 elif hasattr(source, 'read'):
779 # file-like (but not file) source
780 proc = subprocess.Popen(args,
781 stdout = open('/dev/null','w'),
782 stderr = subprocess.PIPE,
783 stdin = subprocess.PIPE)
789 buf = source.read(4096)
794 rdrdy, wrdy, broken = select.select(
797 [proc.stderr,proc.stdin])
799 if proc.stderr in rdrdy:
800 # use os.read for fully unbuffered behavior
801 err.append(os.read(proc.stderr.fileno(), 4096))
803 if proc.stdin in wrdy:
804 proc.stdin.write(buf)
810 err.append(proc.stderr.read())
812 proc._known_hosts = tmp_known_hosts
813 eintr_retry(proc.wait)()
814 return ((None,''.join(err)), proc)
815 elif hasattr(dest, 'write'):
816 # file-like (but not file) dest
817 proc = subprocess.Popen(args,
818 stdout = subprocess.PIPE,
819 stderr = subprocess.PIPE,
820 stdin = open('/dev/null','w'))
825 rdrdy, wrdy, broken = select.select(
826 [proc.stderr, proc.stdout],
828 [proc.stderr, proc.stdout])
830 if proc.stderr in rdrdy:
831 # use os.read for fully unbuffered behavior
832 err.append(os.read(proc.stderr.fileno(), 4096))
834 if proc.stdout in rdrdy:
835 # use os.read for fully unbuffered behavior
836 buf = os.read(proc.stdout.fileno(), 4096)
845 err.append(proc.stderr.read())
847 proc._known_hosts = tmp_known_hosts
848 eintr_retry(proc.wait)()
849 return ((None,''.join(err)), proc)
851 raise AssertionError, "Unreachable code reached! :-Q"
853 # Parse destination as <user>@<server>:<path>
854 if isinstance(dest, basestring) and ':' in dest:
855 remspec, path = dest.split(':',1)
856 elif isinstance(source, basestring) and ':' in source:
857 remspec, path = source.split(':',1)
859 raise ValueError, "Both endpoints cannot be local"
860 user,host = remspec.rsplit('@',1)
863 tmp_known_hosts = None
864 args = ['scp', '-q', '-p', '-C',
865 # Don't bother with localhost. Makes test easier
866 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
868 args.append('-P%d' % port)
872 args.extend(('-i', ident_key))
874 # Create a temporary server key file
875 tmp_known_hosts = _make_server_key_args(
876 server_key, host, port, args)
877 if isinstance(source,list):
883 # connects to the remote host and starts a remote connection
884 proc = subprocess.Popen(args,
885 stdout = subprocess.PIPE,
886 stdin = subprocess.PIPE,
887 stderr = subprocess.PIPE)
888 proc._known_hosts = tmp_known_hosts
890 comm = proc.communicate()
891 eintr_retry(proc.wait)()
894 def decode_and_execute():
895 # The python code we want to execute might have characters that
896 # are not compatible with the 'inline' mode we are using. To avoid
897 # problems we receive the encoded python code in base64 as a input
898 # stream and decode it for execution.
903 cmd += os.read(0, 1)# one byte from stdin
905 if e.errno == errno.EINTR:
911 cmd = base64.b64decode(cmd)
912 # Uncomment for debug
913 #os.write(2, "Executing python code: %s\n" % cmd)
914 os.write(1, "OK\n") # send a sync message
917 def popen_python(python_code,
918 communication = DC.ACCESS_LOCAL,
928 environment_setup = ""):
932 python_path.replace("'", r"'\''")
933 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
935 if environment_setup:
936 cmd += environment_setup
938 # Uncomment for debug (to run everything under strace)
939 # We had to verify if strace works (cannot nest them)
940 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
942 #cmd += "strace -f -tt -s 200 -o strace$$.out "
944 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
945 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
950 cmd = "sudo bash -c " + shell_escape(cmd)
954 if communication == DC.ACCESS_SSH:
955 tmp_known_hosts = None
957 # Don't bother with localhost. Makes test easier
958 '-o', 'NoHostAuthenticationForLocalhost=yes',
959 '-o', 'ConnectionAttempts=3',
960 '-o', 'ServerAliveInterval=30',
961 '-o', 'TCPKeepAlive=yes',
966 args.append('-p%d' % port)
968 args.extend(('-i', ident_key))
972 # Create a temporary server key file
973 tmp_known_hosts = _make_server_key_args(
974 server_key, host, port, args)
977 args = [ "/bin/bash", "-c", cmd ]
979 # connects to the remote host and starts a remote
980 proc = subprocess.Popen(args,
982 stdout = subprocess.PIPE,
983 stdin = subprocess.PIPE,
984 stderr = subprocess.PIPE)
986 if communication == DC.ACCESS_SSH:
987 proc._known_hosts = tmp_known_hosts
989 # send the command to execute
990 os.write(proc.stdin.fileno(),
991 base64.b64encode(python_code) + "\n")
995 msg = os.read(proc.stdout.fileno(), 3)
998 if e.errno == errno.EINTR:
1004 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1005 msg, proc.stdout.read(), proc.stderr.read())
1010 def _communicate(self, input, timeout=None, err_on_timeout=True):
1013 stdout = None # Return
1014 stderr = None # Return
1018 if timeout is not None:
1019 timelimit = time.time() + timeout
1020 killtime = timelimit + 4
1021 bailtime = timelimit + 4
1024 # Flush stdio buffer. This might block, if the user has
1025 # been writing to .stdin in an uncontrolled fashion.
1028 write_set.append(self.stdin)
1032 read_set.append(self.stdout)
1035 read_set.append(self.stderr)
1039 while read_set or write_set:
1040 if timeout is not None:
1041 curtime = time.time()
1042 if timeout is None or curtime > timelimit:
1043 if curtime > bailtime:
1045 elif curtime > killtime:
1046 signum = signal.SIGKILL
1048 signum = signal.SIGTERM
1050 os.kill(self.pid, signum)
1051 select_timeout = 0.5
1053 select_timeout = timelimit - curtime + 0.1
1055 select_timeout = 1.0
1057 if select_timeout > 1.0:
1058 select_timeout = 1.0
1061 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1062 except select.error,e:
1068 if not rlist and not wlist and not xlist and self.poll() is not None:
1069 # timeout and process exited, say bye
1072 if self.stdin in wlist:
1073 # When select has indicated that the file is writable,
1074 # we can write up to PIPE_BUF bytes without risk
1075 # blocking. POSIX defines PIPE_BUF >= 512
1076 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1077 input_offset += bytes_written
1078 if input_offset >= len(input):
1080 write_set.remove(self.stdin)
1082 if self.stdout in rlist:
1083 data = os.read(self.stdout.fileno(), 1024)
1086 read_set.remove(self.stdout)
1089 if self.stderr in rlist:
1090 data = os.read(self.stderr.fileno(), 1024)
1093 read_set.remove(self.stderr)
1096 # All data exchanged. Translate lists into strings.
1097 if stdout is not None:
1098 stdout = ''.join(stdout)
1099 if stderr is not None:
1100 stderr = ''.join(stderr)
1102 # Translate newlines, if requested. We cannot let the file
1103 # object do the translation: It is based on stdio, which is
1104 # impossible to combine with select (unless forcing no
1106 if self.universal_newlines and hasattr(file, 'newlines'):
1108 stdout = self._translate_newlines(stdout)
1110 stderr = self._translate_newlines(stderr)
1112 if killed and err_on_timeout:
1113 errcode = self.poll()
1114 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1120 return (stdout, stderr)