X-Git-Url: http://git.onelab.eu/?p=monitor.git;a=blobdiff_plain;f=monitor%2FRpyc%2FChannel.py;fp=monitor%2FRpyc%2FChannel.py;h=106f936899d6fa05e1c7c0eb30ca11c7852a03f4;hp=0000000000000000000000000000000000000000;hb=334378a14103c3fd02332b6ce3767553f1fe11d2;hpb=486326759a86f1315d93aeaccf6e2641af2bd9d9 diff --git a/monitor/Rpyc/Channel.py b/monitor/Rpyc/Channel.py new file mode 100644 index 0000000..106f936 --- /dev/null +++ b/monitor/Rpyc/Channel.py @@ -0,0 +1,48 @@ +from threading import RLock +import struct + + +class Channel(object): + """a channel transfers packages over a stream. a package is any blob of data, + up to 4GB in size. channels are gauranteed to be thread-safe""" + HEADER_FORMAT = ">L" # byte order must be the same at both sides! + HEADER_SIZE = struct.calcsize(HEADER_FORMAT) + + def __init__(self, stream): + self.lock = RLock() + self.stream = stream + + def __repr__(self): + return "<%s(%r)>" % (self.__class__.__name__, self.stream) + + def send(self, data): + """sends a package""" + try: + self.lock.acquire() + header = struct.pack(self.HEADER_FORMAT, len(data)) + self.stream.write(header + data) + finally: + self.lock.release() + + def recv(self): + """receives a package (blocking)""" + try: + self.lock.acquire() + length, = struct.unpack(self.HEADER_FORMAT, self.stream.read(self.HEADER_SIZE)) + return self.stream.read(length) + finally: + self.lock.release() + + def close(self): + return self.stream.close() + + def fileno(self): + return self.stream.fileno() + + def is_available(self): + return self.stream.is_available() + + def wait(self): + return self.stream.wait() + +