--- /dev/null
+from threading import RLock\r
+import struct\r
+\r
+\r
+class Channel(object):\r
+ """a channel transfers packages over a stream. a package is any blob of data,\r
+ up to 4GB in size. channels are gauranteed to be thread-safe"""\r
+ HEADER_FORMAT = ">L" # byte order must be the same at both sides!\r
+ HEADER_SIZE = struct.calcsize(HEADER_FORMAT)\r
+\r
+ def __init__(self, stream):\r
+ self.lock = RLock()\r
+ self.stream = stream\r
+\r
+ def __repr__(self):\r
+ return "<%s(%r)>" % (self.__class__.__name__, self.stream)\r
+\r
+ def send(self, data):\r
+ """sends a package"""\r
+ try:\r
+ self.lock.acquire()\r
+ header = struct.pack(self.HEADER_FORMAT, len(data))\r
+ self.stream.write(header + data)\r
+ finally:\r
+ self.lock.release()\r
+ \r
+ def recv(self):\r
+ """receives a package (blocking)"""\r
+ try:\r
+ self.lock.acquire()\r
+ length, = struct.unpack(self.HEADER_FORMAT, self.stream.read(self.HEADER_SIZE))\r
+ return self.stream.read(length)\r
+ finally:\r
+ self.lock.release()\r
+ \r
+ def close(self):\r
+ return self.stream.close()\r
+\r
+ def fileno(self):\r
+ return self.stream.fileno()\r
+\r
+ def is_available(self):\r
+ return self.stream.is_available()\r
+\r
+ def wait(self):\r
+ return self.stream.wait()\r
+\r
+\r