pull in additional changes from 2.0 branch.
[monitor.git] / monitor / Rpyc / Stream.py
diff --git a/monitor/Rpyc/Stream.py b/monitor/Rpyc/Stream.py
new file mode 100644 (file)
index 0000000..66568b6
--- /dev/null
@@ -0,0 +1,117 @@
+import select\r
+import socket\r
+\r
+\r
+class Stream(object):\r
+    """\r
+    a stream is a file-like object that is used to expose a consistent and uniform interface\r
+    to the 'physical' file-like objects (like sockets and pipes), which have many quirks (sockets\r
+    may recv() less than `count`, pipes are simplex and don't flush, etc.).\r
+    a stream is always in blocking mode.\r
+    """\r
+    \r
+    def close(self):\r
+        raise NotImplementedError()\r
+\r
+    def fileno(self):\r
+        raise NotImplementedError()\r
+\r
+    def is_available(self):\r
+        rlist, wlist, xlist = select.select([self], [], [], 0)\r
+        return bool(rlist)\r
+\r
+    def wait(self):\r
+        select.select([self], [], [])\r
+\r
+    def read(self, count):\r
+        raise NotImplementedError()\r
+\r
+    def write(self, data):\r
+        raise NotImplementedError()\r
+        \r
+        \r
+class SocketStream(Stream):\r
+    """\r
+    a stream that operates over a socket. note: \r
+        * the socket is expected to be reliable (i.e., TCP)\r
+        * the socket is expected to be in blocking mode\r
+    """\r
+    def __init__(self, sock):\r
+        self.sock = sock\r
+    \r
+    def __repr__(self):\r
+        host, port = self.sock.getpeername()\r
+        return "<%s(%s:%d)>" % (self.__class__.__name__, host, port)\r
+\r
+    def from_new_socket(cls, host, port, **kw):\r
+        sock = socket.socket(**kw)\r
+        sock.connect((host, port))\r
+        return cls(sock)\r
+    from_new_socket = classmethod( from_new_socket )\r
+\r
+    def fileno(self):\r
+        return self.sock.fileno()\r
+        \r
+    def close(self):\r
+        self.sock.close()\r
+        \r
+    def read(self, count):\r
+        data = []\r
+        while count > 0:\r
+            buf = self.sock.recv(count)\r
+            if not buf:\r
+                raise EOFError()\r
+            count -= len(buf)\r
+            data.append(buf)\r
+        return "".join(data)\r
+            \r
+    def write(self, data):\r
+        while data:\r
+            count = self.sock.send(data)\r
+            data = data[count:]\r
+\r
+\r
+class PipeStream(Stream):\r
+    """\r
+    a stream that operates over two simplex pipes. \r
+    note: the pipes are expected to be in blocking mode\r
+    """\r
+    \r
+    def __init__(self, incoming, outgoing):\r
+        self.incoming = incoming\r
+        self.outgoing = outgoing\r
+\r
+    def fileno(self):\r
+        return self.incoming.fileno()\r
+        \r
+    def close(self):\r
+        self.incoming.close()\r
+        self.outgoing.close()\r
+        \r
+    def read(self, count):\r
+        data = []\r
+        while count > 0:\r
+            buf = self.incoming.read(count)\r
+            if not buf:\r
+                raise EOFError()\r
+            count -= len(buf)\r
+            data.append(buf)\r
+        return "".join(data)\r
+            \r
+    def write(self, data):\r
+        self.outgoing.write(data)\r
+        self.outgoing.flush()\r
+\r
+    # win32: stubs\r
+    import sys\r
+    if sys.platform == "win32":\r
+        def is_available(self):\r
+            return True\r
+\r
+        def wait(self):\r
+            pass\r
+\r
+\r
+\r
+\r
+\r