--- /dev/null
+import select\r
+import socket\r
+\r
+\r
+class Stream(object):\r
+ """\r
+ a stream is a file-like object that is used to expose a consistent and uniform interface\r
+ to the 'physical' file-like objects (like sockets and pipes), which have many quirks (sockets\r
+ may recv() less than `count`, pipes are simplex and don't flush, etc.).\r
+ a stream is always in blocking mode.\r
+ """\r
+ \r
+ def close(self):\r
+ raise NotImplementedError()\r
+\r
+ def fileno(self):\r
+ raise NotImplementedError()\r
+\r
+ def is_available(self):\r
+ rlist, wlist, xlist = select.select([self], [], [], 0)\r
+ return bool(rlist)\r
+\r
+ def wait(self):\r
+ select.select([self], [], [])\r
+\r
+ def read(self, count):\r
+ raise NotImplementedError()\r
+\r
+ def write(self, data):\r
+ raise NotImplementedError()\r
+ \r
+ \r
+class SocketStream(Stream):\r
+ """\r
+ a stream that operates over a socket. note: \r
+ * the socket is expected to be reliable (i.e., TCP)\r
+ * the socket is expected to be in blocking mode\r
+ """\r
+ def __init__(self, sock):\r
+ self.sock = sock\r
+ \r
+ def __repr__(self):\r
+ host, port = self.sock.getpeername()\r
+ return "<%s(%s:%d)>" % (self.__class__.__name__, host, port)\r
+\r
+ def from_new_socket(cls, host, port, **kw):\r
+ sock = socket.socket(**kw)\r
+ sock.connect((host, port))\r
+ return cls(sock)\r
+ from_new_socket = classmethod( from_new_socket )\r
+\r
+ def fileno(self):\r
+ return self.sock.fileno()\r
+ \r
+ def close(self):\r
+ self.sock.close()\r
+ \r
+ def read(self, count):\r
+ data = []\r
+ while count > 0:\r
+ buf = self.sock.recv(count)\r
+ if not buf:\r
+ raise EOFError()\r
+ count -= len(buf)\r
+ data.append(buf)\r
+ return "".join(data)\r
+ \r
+ def write(self, data):\r
+ while data:\r
+ count = self.sock.send(data)\r
+ data = data[count:]\r
+\r
+\r
+class PipeStream(Stream):\r
+ """\r
+ a stream that operates over two simplex pipes. \r
+ note: the pipes are expected to be in blocking mode\r
+ """\r
+ \r
+ def __init__(self, incoming, outgoing):\r
+ self.incoming = incoming\r
+ self.outgoing = outgoing\r
+\r
+ def fileno(self):\r
+ return self.incoming.fileno()\r
+ \r
+ def close(self):\r
+ self.incoming.close()\r
+ self.outgoing.close()\r
+ \r
+ def read(self, count):\r
+ data = []\r
+ while count > 0:\r
+ buf = self.incoming.read(count)\r
+ if not buf:\r
+ raise EOFError()\r
+ count -= len(buf)\r
+ data.append(buf)\r
+ return "".join(data)\r
+ \r
+ def write(self, data):\r
+ self.outgoing.write(data)\r
+ self.outgoing.flush()\r
+\r
+ # win32: stubs\r
+ import sys\r
+ if sys.platform == "win32":\r
+ def is_available(self):\r
+ return True\r
+\r
+ def wait(self):\r
+ pass\r
+\r
+\r
+\r
+\r
+\r