pull in additional changes from 2.0 branch.
[monitor.git] / monitor / Rpyc / Channel.py
diff --git a/monitor/Rpyc/Channel.py b/monitor/Rpyc/Channel.py
new file mode 100644 (file)
index 0000000..106f936
--- /dev/null
@@ -0,0 +1,48 @@
+from threading import RLock\r
+import struct\r
+\r
+\r
+class Channel(object):\r
+    """a channel transfers packages over a stream. a package is any blob of data,\r
+    up to 4GB in size. channels are gauranteed to be thread-safe"""\r
+    HEADER_FORMAT = ">L" # byte order must be the same at both sides!\r
+    HEADER_SIZE = struct.calcsize(HEADER_FORMAT)\r
+\r
+    def __init__(self, stream):\r
+        self.lock = RLock()\r
+        self.stream = stream\r
+\r
+    def __repr__(self):\r
+        return "<%s(%r)>" % (self.__class__.__name__, self.stream)\r
+\r
+    def send(self, data):\r
+        """sends a package"""\r
+        try:\r
+            self.lock.acquire()\r
+            header = struct.pack(self.HEADER_FORMAT, len(data))\r
+            self.stream.write(header + data)\r
+        finally:\r
+            self.lock.release()\r
+        \r
+    def recv(self):\r
+        """receives a package (blocking)"""\r
+        try:\r
+            self.lock.acquire()\r
+            length, = struct.unpack(self.HEADER_FORMAT, self.stream.read(self.HEADER_SIZE))\r
+            return self.stream.read(length)\r
+        finally:\r
+            self.lock.release()\r
+    \r
+    def close(self):\r
+        return self.stream.close()\r
+\r
+    def fileno(self):\r
+        return self.stream.fileno()\r
+\r
+    def is_available(self):\r
+        return self.stream.is_available()\r
+\r
+    def wait(self):\r
+        return self.stream.wait()\r
+\r
+\r