2 # -*- coding: utf-8 -*-
4 from nepi.util.constants import DeploymentConfiguration as DC
27 CTRL_SOCK = "ctrl.sock"
29 STD_ERR = "stderr.log"
34 TRACE = os.environ.get("NEPI_TRACE", "false").lower() in ("true", "1", "on")
36 OPENSSH_HAS_PERSIST = None
38 if hasattr(os, "devnull"):
41 DEV_NULL = "/dev/null"
43 SHELL_SAFE = re.compile('^[-a-zA-Z0-9_=+:.,/]*$')
45 def openssh_has_persist():
46 global OPENSSH_HAS_PERSIST
47 if OPENSSH_HAS_PERSIST is None:
48 proc = subprocess.Popen(["ssh","-v"],
49 stdout = subprocess.PIPE,
50 stderr = subprocess.STDOUT,
51 stdin = open("/dev/null","r") )
52 out,err = proc.communicate()
55 vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
56 OPENSSH_HAS_PERSIST = bool(vre.match(out))
57 return OPENSSH_HAS_PERSIST
60 """ Escapes strings so that they are safe to use as command-line arguments """
61 if SHELL_SAFE.match(s):
62 # safe string - no escaping needed
65 # unsafe string - escape
67 if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
70 return "'$'\\x%02x''" % (ord(c),)
71 s = ''.join(map(escp,s))
74 def eintr_retry(func):
76 @functools.wraps(func)
78 retry = kw.pop("_retry", False)
79 for i in xrange(0 if retry else 4):
82 except (select.error, socket.error), args:
83 if args[0] == errno.EINTR:
88 if e.errno == errno.EINTR:
97 def __init__(self, root_dir = ".", log_level = DC.ERROR_LEVEL,
98 environment_setup = "", clean_root = False):
99 self._root_dir = root_dir
100 self._clean_root = clean_root
102 self._ctrl_sock = None
103 self._log_level = log_level
105 self._environment_setup = environment_setup
110 self.post_daemonize()
114 # can not return normally after fork beacuse no exec was done.
115 # This means that if we don't do a os._exit(0) here the code that
116 # follows the call to "Server.run()" in the "caller code" will be
117 # executed... but by now it has already been executed after the
118 # first process (the one that did the first fork) returned.
121 print >>sys.stderr, "SERVER_ERROR."
125 print >>sys.stderr, "SERVER_READY."
128 # pipes for process synchronization
132 root = os.path.normpath(self._root_dir)
133 if self._root_dir not in [".", ""] and os.path.exists(root) \
134 and self._clean_root:
136 if not os.path.exists(root):
137 os.makedirs(root, 0755)
145 except OSError, e: # pragma: no cover
146 if e.errno == errno.EINTR:
152 # os.waitpid avoids leaving a <defunc> (zombie) process
153 st = os.waitpid(pid1, 0)[1]
155 raise RuntimeError("Daemonization failed")
156 # return 0 to inform the caller method that this is not the
161 # Decouple from parent environment.
162 os.chdir(self._root_dir)
169 # see ref: "os._exit(0)"
172 # close all open file descriptors.
173 max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
174 if (max_fd == resource.RLIM_INFINITY):
176 for fd in range(3, max_fd):
183 # Redirect standard file descriptors.
184 stdin = open(DEV_NULL, "r")
185 stderr = stdout = open(STD_ERR, "a", 0)
186 os.dup2(stdin.fileno(), sys.stdin.fileno())
187 # NOTE: sys.stdout.write will still be buffered, even if the file
188 # was opened with 0 buffer
189 os.dup2(stdout.fileno(), sys.stdout.fileno())
190 os.dup2(stderr.fileno(), sys.stderr.fileno())
193 if self._environment_setup:
194 # parse environment variables and pass to child process
195 # do it by executing shell commands, in case there's some heavy setup involved
196 envproc = subprocess.Popen(
198 "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
199 ( self._environment_setup, ) ],
200 stdin = subprocess.PIPE,
201 stdout = subprocess.PIPE,
202 stderr = subprocess.PIPE
204 out,err = envproc.communicate()
206 # parse new environment
208 environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
210 # apply to current environment
211 for name, value in environment.iteritems():
212 os.environ[name] = value
215 if 'PYTHONPATH' in environment:
216 sys.path = environment['PYTHONPATH'].split(':') + sys.path
218 # create control socket
219 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
221 self._ctrl_sock.bind(CTRL_SOCK)
223 # Address in use, check pidfile
226 pidfile = open(CTRL_PID, "r")
235 # Check process liveliness
236 if not os.path.exists("/proc/%d" % (pid,)):
237 # Ok, it's dead, clean the socket
241 self._ctrl_sock.bind(CTRL_SOCK)
243 self._ctrl_sock.listen(0)
246 pidfile = open(CTRL_PID, "w")
247 pidfile.write(str(os.getpid()))
250 # let the parent process know that the daemonization is finished
255 def post_daemonize(self):
256 os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
257 # QT, for some strange reason, redefines the SIGCHILD handler to write
258 # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
259 # Server dameonization closes all file descriptors from fileno '3',
260 # but the overloaded handler (inherited by the forked process) will
261 # keep trying to write the \0 to fileno 'x', which might have been reused
262 # after closing, for other operations. This is bad bad bad when fileno 'x'
263 # is in use for communication pouroses, because unexpected \0 start
264 # appearing in the communication messages... this is exactly what happens
265 # when using netns in daemonized form. Thus, be have no other alternative than
266 # restoring the SIGCHLD handler to the default here.
268 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
271 while not self._stop:
272 conn, addr = self._ctrl_sock.accept()
273 self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
275 while not self._stop:
277 msg = self.recv_msg(conn)
278 except socket.timeout, e:
279 #self.log_error("SERVER recv_msg: connection timedout ")
283 self.log_error("CONNECTION LOST")
288 reply = self.stop_action()
290 reply = self.reply_action(msg)
293 self.send_reply(conn, reply)
296 self.log_error("NOTICE: Awaiting for reconnection")
304 def recv_msg(self, conn):
307 while '\n' not in chunk:
309 chunk = conn.recv(1024)
310 except (OSError, socket.error), e:
311 if e[0] != errno.EINTR:
320 data = ''.join(data).split('\n',1)
323 data, self._rdbuf = data
325 decoded = base64.b64decode(data)
326 return decoded.rstrip()
328 def send_reply(self, conn, reply):
329 encoded = base64.b64encode(reply)
330 conn.send("%s\n" % encoded)
334 self._ctrl_sock.close()
339 def stop_action(self):
340 return "Stopping server"
342 def reply_action(self, msg):
343 return "Reply to: %s" % msg
345 def log_error(self, text = None, context = ''):
347 text = traceback.format_exc()
348 date = time.strftime("%Y-%m-%d %H:%M:%S")
350 context = " (%s)" % (context,)
351 sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
354 def log_debug(self, text):
355 if self._log_level == DC.DEBUG_LEVEL:
356 date = time.strftime("%Y-%m-%d %H:%M:%S")
357 sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
359 class Forwarder(object):
360 def __init__(self, root_dir = "."):
361 self._ctrl_sock = None
362 self._root_dir = root_dir
368 print >>sys.stderr, "FORWARDER_READY."
369 while not self._stop:
370 data = self.read_data()
372 # Connection to client lost
374 self.send_to_server(data)
376 data = self.recv_from_server()
378 # Connection to server lost
379 raise IOError, "Connection to server lost while "\
381 self.write_data(data)
385 return sys.stdin.readline()
387 def write_data(self, data):
388 sys.stdout.write(data)
389 # sys.stdout.write is buffered, this is why we need to do a flush()
392 def send_to_server(self, data):
394 self._ctrl_sock.send(data)
395 except (IOError, socket.error), e:
396 if e[0] == errno.EPIPE:
398 self._ctrl_sock.send(data)
401 encoded = data.rstrip()
402 msg = base64.b64decode(encoded)
406 def recv_from_server(self):
409 while '\n' not in chunk:
411 chunk = self._ctrl_sock.recv(1024)
412 except (OSError, socket.error), e:
413 if e[0] != errno.EINTR:
421 data = ''.join(data).split('\n',1)
424 data, self._rdbuf = data
430 self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
431 sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
432 self._ctrl_sock.connect(sock_addr)
434 def disconnect(self):
436 self._ctrl_sock.close()
440 class Client(object):
441 def __init__(self, root_dir = ".", host = None, port = None, user = None,
442 agent = None, sudo = False, communication = DC.ACCESS_LOCAL,
443 environment_setup = ""):
444 self.root_dir = root_dir
445 self.addr = (host, port)
449 self.communication = communication
450 self.environment_setup = environment_setup
451 self._stopped = False
452 self._deferreds = collections.deque()
456 if self._process.poll() is None:
457 os.kill(self._process.pid, signal.SIGTERM)
461 root_dir = self.root_dir
462 (host, port) = self.addr
466 communication = self.communication
468 python_code = "from nepi.util import server;c=server.Forwarder(%r);\
469 c.forward()" % (root_dir,)
471 self._process = popen_python(python_code,
472 communication = communication,
478 environment_setup = self.environment_setup)
480 # Wait for the forwarder to be ready, otherwise nobody
481 # will be able to connect to it
485 helo = self._process.stderr.readline()
486 if helo == 'FORWARDER_READY.\n':
490 raise AssertionError, "Expected 'FORWARDER_READY.', got: %s" % (''.join(err),)
492 def send_msg(self, msg):
493 encoded = base64.b64encode(msg)
494 data = "%s\n" % encoded
497 self._process.stdin.write(data)
498 except (IOError, ValueError):
499 # dead process, poll it to un-zombify
502 # try again after reconnect
503 # If it fails again, though, give up
505 self._process.stdin.write(data)
508 self.send_msg(STOP_MSG)
511 def defer_reply(self, transform=None):
513 self._deferreds.append(defer_entry)
515 functools.partial(self.read_reply, defer_entry, transform)
518 def _read_reply(self):
519 data = self._process.stdout.readline()
520 encoded = data.rstrip()
522 # empty == eof == dead process, poll it to un-zombify
525 raise RuntimeError, "Forwarder died while awaiting reply: %s" % (self._process.stderr.read(),)
526 return base64.b64decode(encoded)
528 def read_reply(self, which=None, transform=None):
529 # Test to see if someone did it already
530 if which is not None and len(which):
532 # ...just return the deferred value
534 return transform(which[0])
538 # Process all deferreds until the one we're looking for
539 # or until the queue is empty
540 while self._deferreds:
542 deferred = self._deferreds.popleft()
547 deferred.append(self._read_reply())
548 if deferred is which:
549 # We reached the one we were looking for
551 return transform(deferred[0])
556 # They've requested a synchronous read
558 return transform(self._read_reply())
560 return self._read_reply()
562 def _make_server_key_args(server_key, host, port, args):
564 Returns a reference to the created temporary file, and adds the
565 corresponding arguments to the given argument list.
567 Make sure to hold onto it until the process is done with the file
570 host = '%s:%s' % (host,port)
571 # Create a temporary server key file
572 tmp_known_hosts = tempfile.NamedTemporaryFile()
574 # Add the intended host key
575 tmp_known_hosts.write('%s,%s %s\n' % (host, socket.gethostbyname(host), server_key))
577 # If we're not in strict mode, add user-configured keys
578 if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
579 user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
580 if os.access(user_hosts_path, os.R_OK):
581 f = open(user_hosts_path, "r")
582 tmp_known_hosts.write(f.read())
585 tmp_known_hosts.flush()
587 args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
589 return tmp_known_hosts
591 def make_connkey(user, host, port):
592 connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
593 if len(connkey) > 60:
594 connkey = hashlib.sha1(connkey).hexdigest()
597 def popen_ssh_command(command, host, port, user, agent,
604 err_on_timeout = True,
605 connect_timeout = 30,
609 Executes a remote commands, returns ((stdout,stderr),process)
612 print "ssh", host, command
614 tmp_known_hosts = None
615 connkey = make_connkey(user,host,port)
617 # Don't bother with localhost. Makes test easier
618 '-o', 'NoHostAuthenticationForLocalhost=yes',
619 '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
620 '-o', 'ConnectionAttempts=3',
621 '-o', 'ServerAliveInterval=30',
622 '-o', 'TCPKeepAlive=yes',
623 '-l', user, hostip or host]
624 if persistent and openssh_has_persist():
626 '-o', 'ControlMaster=auto',
627 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
628 '-o', 'ControlPersist=60' ])
632 args.append('-p%d' % port)
634 args.extend(('-i', ident_key))
638 # Create a temporary server key file
639 tmp_known_hosts = _make_server_key_args(
640 server_key, host, port, args)
643 for x in xrange(retry or 3):
644 # connects to the remote host and starts a remote connection
645 proc = subprocess.Popen(args,
646 stdout = subprocess.PIPE,
647 stdin = subprocess.PIPE,
648 stderr = subprocess.PIPE)
650 # attach tempfile object to the process, to make sure the file stays
651 # alive until the process is finished with it
652 proc._known_hosts = tmp_known_hosts
655 out, err = _communicate(proc, stdin, timeout, err_on_timeout)
657 if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
658 # SSH error, can safely retry
661 # Probably timed out or plain failed but can retry
664 except RuntimeError,e:
668 print " timedout -> ", e.args
672 print " -> ", out, err
674 return ((out, err), proc)
676 def popen_scp(source, dest,
683 Copies from/to remote sites.
685 Source and destination should have the user and host encoded
688 If source is a file object, a special mode will be used to
689 create the remote file with the same contents.
691 If dest is a file object, the remote file (source) will be
692 read and written into dest.
694 In these modes, recursive cannot be True.
696 Source can be a list of files to copy to a single destination,
697 in which case it is advised that the destination be a folder.
701 print "scp", source, dest
703 if isinstance(source, file) and source.tell() == 0:
705 elif hasattr(source, 'read'):
706 tmp = tempfile.NamedTemporaryFile()
708 buf = source.read(65536)
716 if isinstance(source, file) or isinstance(dest, file) \
717 or hasattr(source, 'read') or hasattr(dest, 'write'):
720 # Parse source/destination as <user>@<server>:<path>
721 if isinstance(dest, basestring) and ':' in dest:
722 remspec, path = dest.split(':',1)
723 elif isinstance(source, basestring) and ':' in source:
724 remspec, path = source.split(':',1)
726 raise ValueError, "Both endpoints cannot be local"
727 user,host = remspec.rsplit('@',1)
728 tmp_known_hosts = None
730 connkey = make_connkey(user,host,port)
731 args = ['ssh', '-l', user, '-C',
732 # Don't bother with localhost. Makes test easier
733 '-o', 'NoHostAuthenticationForLocalhost=yes',
734 '-o', 'ConnectTimeout=30',
735 '-o', 'ConnectionAttempts=3',
736 '-o', 'ServerAliveInterval=30',
737 '-o', 'TCPKeepAlive=yes',
739 if openssh_has_persist():
741 '-o', 'ControlMaster=auto',
742 '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
743 '-o', 'ControlPersist=60' ])
745 args.append('-P%d' % port)
747 args.extend(('-i', ident_key))
749 # Create a temporary server key file
750 tmp_known_hosts = _make_server_key_args(
751 server_key, host, port, args)
753 if isinstance(source, file) or hasattr(source, 'read'):
754 args.append('cat > %s' % (shell_escape(path),))
755 elif isinstance(dest, file) or hasattr(dest, 'write'):
756 args.append('cat %s' % (shell_escape(path),))
758 raise AssertionError, "Unreachable code reached! :-Q"
760 # connects to the remote host and starts a remote connection
761 if isinstance(source, file):
762 proc = subprocess.Popen(args,
763 stdout = open('/dev/null','w'),
764 stderr = subprocess.PIPE,
766 err = proc.stderr.read()
767 proc._known_hosts = tmp_known_hosts
768 eintr_retry(proc.wait)()
769 return ((None,err), proc)
770 elif isinstance(dest, file):
771 proc = subprocess.Popen(args,
772 stdout = open('/dev/null','w'),
773 stderr = subprocess.PIPE,
775 err = proc.stderr.read()
776 proc._known_hosts = tmp_known_hosts
777 eintr_retry(proc.wait)()
778 return ((None,err), proc)
779 elif hasattr(source, 'read'):
780 # file-like (but not file) source
781 proc = subprocess.Popen(args,
782 stdout = open('/dev/null','w'),
783 stderr = subprocess.PIPE,
784 stdin = subprocess.PIPE)
790 buf = source.read(4096)
795 rdrdy, wrdy, broken = select.select(
798 [proc.stderr,proc.stdin])
800 if proc.stderr in rdrdy:
801 # use os.read for fully unbuffered behavior
802 err.append(os.read(proc.stderr.fileno(), 4096))
804 if proc.stdin in wrdy:
805 proc.stdin.write(buf)
811 err.append(proc.stderr.read())
813 proc._known_hosts = tmp_known_hosts
814 eintr_retry(proc.wait)()
815 return ((None,''.join(err)), proc)
816 elif hasattr(dest, 'write'):
817 # file-like (but not file) dest
818 proc = subprocess.Popen(args,
819 stdout = subprocess.PIPE,
820 stderr = subprocess.PIPE,
821 stdin = open('/dev/null','w'))
826 rdrdy, wrdy, broken = select.select(
827 [proc.stderr, proc.stdout],
829 [proc.stderr, proc.stdout])
831 if proc.stderr in rdrdy:
832 # use os.read for fully unbuffered behavior
833 err.append(os.read(proc.stderr.fileno(), 4096))
835 if proc.stdout in rdrdy:
836 # use os.read for fully unbuffered behavior
837 buf = os.read(proc.stdout.fileno(), 4096)
846 err.append(proc.stderr.read())
848 proc._known_hosts = tmp_known_hosts
849 eintr_retry(proc.wait)()
850 return ((None,''.join(err)), proc)
852 raise AssertionError, "Unreachable code reached! :-Q"
854 # Parse destination as <user>@<server>:<path>
855 if isinstance(dest, basestring) and ':' in dest:
856 remspec, path = dest.split(':',1)
857 elif isinstance(source, basestring) and ':' in source:
858 remspec, path = source.split(':',1)
860 raise ValueError, "Both endpoints cannot be local"
861 user,host = remspec.rsplit('@',1)
864 tmp_known_hosts = None
865 args = ['scp', '-q', '-p', '-C',
866 # Don't bother with localhost. Makes test easier
867 '-o', 'NoHostAuthenticationForLocalhost=yes' ]
869 args.append('-P%d' % port)
873 args.extend(('-i', ident_key))
875 # Create a temporary server key file
876 tmp_known_hosts = _make_server_key_args(
877 server_key, host, port, args)
878 if isinstance(source,list):
884 # connects to the remote host and starts a remote connection
885 proc = subprocess.Popen(args,
886 stdout = subprocess.PIPE,
887 stdin = subprocess.PIPE,
888 stderr = subprocess.PIPE)
889 proc._known_hosts = tmp_known_hosts
891 comm = proc.communicate()
892 eintr_retry(proc.wait)()
895 def decode_and_execute():
896 # The python code we want to execute might have characters that
897 # are not compatible with the 'inline' mode we are using. To avoid
898 # problems we receive the encoded python code in base64 as a input
899 # stream and decode it for execution.
904 cmd += os.read(0, 1)# one byte from stdin
906 if e.errno == errno.EINTR:
912 cmd = base64.b64decode(cmd)
913 # Uncomment for debug
914 #os.write(2, "Executing python code: %s\n" % cmd)
915 os.write(1, "OK\n") # send a sync message
918 def popen_python(python_code,
919 communication = DC.ACCESS_LOCAL,
929 environment_setup = ""):
933 python_path.replace("'", r"'\''")
934 cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
936 if environment_setup:
937 cmd += environment_setup
939 # Uncomment for debug (to run everything under strace)
940 # We had to verify if strace works (cannot nest them)
941 #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
943 #cmd += "strace -f -tt -s 200 -o strace$$.out "
945 cmd += "python -c 'import sys; sys.path.insert(0,%s); from nepi.util import server; server.decode_and_execute()'" % (
946 repr(os.path.dirname(os.path.dirname(nepi.__file__))).replace("'",'"'),
951 cmd = "sudo bash -c " + shell_escape(cmd)
955 if communication == DC.ACCESS_SSH:
956 tmp_known_hosts = None
958 # Don't bother with localhost. Makes test easier
959 '-o', 'NoHostAuthenticationForLocalhost=yes',
960 '-o', 'ConnectionAttempts=3',
961 '-o', 'ServerAliveInterval=30',
962 '-o', 'TCPKeepAlive=yes',
967 args.append('-p%d' % port)
969 args.extend(('-i', ident_key))
973 # Create a temporary server key file
974 tmp_known_hosts = _make_server_key_args(
975 server_key, host, port, args)
978 args = [ "/bin/bash", "-c", cmd ]
980 # connects to the remote host and starts a remote
981 proc = subprocess.Popen(args,
983 stdout = subprocess.PIPE,
984 stdin = subprocess.PIPE,
985 stderr = subprocess.PIPE)
987 if communication == DC.ACCESS_SSH:
988 proc._known_hosts = tmp_known_hosts
990 # send the command to execute
991 os.write(proc.stdin.fileno(),
992 base64.b64encode(python_code) + "\n")
996 msg = os.read(proc.stdout.fileno(), 3)
999 if e.errno == errno.EINTR:
1005 raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
1006 msg, proc.stdout.read(), proc.stderr.read())
1011 def _communicate(self, input, timeout=None, err_on_timeout=True):
1014 stdout = None # Return
1015 stderr = None # Return
1019 if timeout is not None:
1020 timelimit = time.time() + timeout
1021 killtime = timelimit + 4
1022 bailtime = timelimit + 4
1025 # Flush stdio buffer. This might block, if the user has
1026 # been writing to .stdin in an uncontrolled fashion.
1029 write_set.append(self.stdin)
1033 read_set.append(self.stdout)
1036 read_set.append(self.stderr)
1040 while read_set or write_set:
1041 if timeout is not None:
1042 curtime = time.time()
1043 if timeout is None or curtime > timelimit:
1044 if curtime > bailtime:
1046 elif curtime > killtime:
1047 signum = signal.SIGKILL
1049 signum = signal.SIGTERM
1051 os.kill(self.pid, signum)
1052 select_timeout = 0.5
1054 select_timeout = timelimit - curtime + 0.1
1056 select_timeout = 1.0
1058 if select_timeout > 1.0:
1059 select_timeout = 1.0
1062 rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
1063 except select.error,e:
1069 if not rlist and not wlist and not xlist and self.poll() is not None:
1070 # timeout and process exited, say bye
1073 if self.stdin in wlist:
1074 # When select has indicated that the file is writable,
1075 # we can write up to PIPE_BUF bytes without risk
1076 # blocking. POSIX defines PIPE_BUF >= 512
1077 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
1078 input_offset += bytes_written
1079 if input_offset >= len(input):
1081 write_set.remove(self.stdin)
1083 if self.stdout in rlist:
1084 data = os.read(self.stdout.fileno(), 1024)
1087 read_set.remove(self.stdout)
1090 if self.stderr in rlist:
1091 data = os.read(self.stderr.fileno(), 1024)
1094 read_set.remove(self.stderr)
1097 # All data exchanged. Translate lists into strings.
1098 if stdout is not None:
1099 stdout = ''.join(stdout)
1100 if stderr is not None:
1101 stderr = ''.join(stderr)
1103 # Translate newlines, if requested. We cannot let the file
1104 # object do the translation: It is based on stdio, which is
1105 # impossible to combine with select (unless forcing no
1107 if self.universal_newlines and hasattr(file, 'newlines'):
1109 stdout = self._translate_newlines(stdout)
1111 stderr = self._translate_newlines(stderr)
1113 if killed and err_on_timeout:
1114 errcode = self.poll()
1115 raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
1121 return (stdout, stderr)