pull in additional changes from 2.0 branch.
[monitor.git] / monitor / Rpyc / Connection.py
diff --git a/monitor/Rpyc/Connection.py b/monitor/Rpyc/Connection.py
new file mode 100644 (file)
index 0000000..81bed23
--- /dev/null
@@ -0,0 +1,215 @@
+import sys\r
+from Boxing import Box, dump_exception, load_exception\r
+from ModuleNetProxy import RootImporter\r
+from Lib import raise_exception, AttrFrontend\r
+\r
+\r
+class Connection(object):\r
+    """\r
+    the rpyc connection layer (protocol and APIs). generally speaking, the only \r
+    things you'll need to access directly from this object are:\r
+     * modules - represents the remote python interprerer's modules namespace\r
+     * execute - executes the given code on the other side of the connection\r
+     * namespace - the namespace in which the code you `execute` resides\r
+\r
+    the rest of the attributes should be of no intresent to you, except maybe for \r
+    `remote_conn`, which represents the other side of the connection. it is unlikely,\r
+    however, you'll need to use it (it is used interally).\r
+    \r
+    when you are done using a connection, and wish to release the resources it\r
+    uses, you should call close(). you don't have to, but if you don't, the gc\r
+    can't release the memory because of cyclic references.\r
+    """\r
+    \r
+    def __init__(self, channel):\r
+        self._closed = False\r
+        self._local_namespace = {}\r
+        self.channel = channel\r
+        self.box = Box(self)\r
+        self.async_replies = {}\r
+        self.sync_replies = {}\r
+        self.request_seq = 0\r
+        self.module_cache = {}\r
+        # user APIs:\r
+        self.modules = RootImporter(self)\r
+        self.remote_conn = self.sync_request("handle_getconn")\r
+        self.namespace = AttrFrontend(self.remote_conn._local_namespace)\r
+    \r
+    def __repr__(self):\r
+        if self._closed:\r
+            return "<%s - closed>" % (self.__class__.__name__,)\r
+        else:\r
+            return "<%s(%r)>" % (self.__class__.__name__, self.channel)\r
+\r
+    # \r
+    # file api layer\r
+    #\r
+    def close(self):\r
+        if self._closed:\r
+            return\r
+        self._closed = True\r
+        self.box.close()\r
+        self.channel.close()\r
+        # untangle circular references\r
+        del self._local_namespace\r
+        del self.channel\r
+        del self.box\r
+        del self.async_replies\r
+        del self.sync_replies\r
+        del self.request_seq\r
+        del self.module_cache\r
+        del self.modules\r
+        del self.remote_conn\r
+        del self.namespace\r
+    \r
+    def fileno(self):\r
+        return self.channel.fileno()\r
+\r
+    #\r
+    # protocol layer\r
+    #\r
+    def _recv(self):\r
+        return self.box.unpack(self.channel.recv())\r
+\r
+    def _send(self, *args):\r
+        return self.channel.send(self.box.pack(args))\r
+    \r
+    def send_request(self, handlername, *args):\r
+        try:\r
+            self.channel.lock.acquire()\r
+            # this must be atomic {\r
+            self.request_seq += 1 \r
+            self._send(handlername, self.request_seq, args)\r
+            return self.request_seq\r
+            # }\r
+        finally:\r
+            self.channel.lock.release()\r
+\r
+    def send_exception(self, seq, exc_info):\r
+        self._send("exception", seq, dump_exception(*exc_info))\r
+\r
+    def send_result(self, seq, obj):\r
+        self._send("result", seq, obj)\r
+\r
+    def dispatch_result(self, seq, obj):\r
+        if seq in self.async_replies:\r
+            self.async_replies.pop(seq)("result", obj)\r
+        else:        \r
+            self.sync_replies[seq] = obj\r
+    \r
+    def dispatch_exception(self, seq, obj):\r
+        excobj = load_exception(obj)\r
+        if seq in self.async_replies:\r
+            self.async_replies.pop(seq)("exception", excobj)\r
+        else:\r
+            raise_exception(*excobj)\r
+\r
+    def dispatch_request(self, handlername, seq, args):\r
+        try:\r
+            res = getattr(self, handlername)(*args)\r
+        except SystemExit:\r
+            raise\r
+        except:\r
+            self.send_exception(seq, sys.exc_info())\r
+        else:\r
+            self.send_result(seq, res)\r
+\r
+    def sync_request(self, *args):\r
+        """performs a synchronous (blocking) request"""\r
+        seq = self.send_request(*args)\r
+        while seq not in self.sync_replies:\r
+            self.serve()\r
+        return self.sync_replies.pop(seq)\r
+    \r
+    def async_request(self, callback, *args):\r
+        """performs an asynchronous (non-blocking) request"""\r
+        seq = self.send_request(*args)\r
+        self.async_replies[seq] = callback\r
+        \r
+    #\r
+    # servers api\r
+    #\r
+    def poll(self):\r
+        """if available, serves a single request, otherwise returns (non-blocking serve)"""\r
+        if self.channel.is_available():\r
+            self.serve()\r
+            return True\r
+        else:\r
+            return False\r
+    \r
+    def serve(self):\r
+        """serves a single request or reply (may block)"""\r
+        self.channel.wait()\r
+        handler, seq, obj = self._recv()\r
+        if handler == "result":\r
+            self.dispatch_result(seq, obj)\r
+        elif handler == "exception":\r
+            self.dispatch_exception(seq, obj)\r
+        else:\r
+            self.dispatch_request(handler, seq, obj)            \r
+\r
+    #\r
+    # root requests\r
+    #\r
+    def rimport(self, modulename):\r
+        """imports a module by name (as a string)"""\r
+        if modulename not in self.module_cache:\r
+            module = self.sync_request("handle_import", modulename)\r
+            self.module_cache[modulename] = module\r
+        return self.module_cache[modulename]            \r
+\r
+    def execute(self, expr, mode = "exec"):\r
+        """execute the given code at the remote side of the connection"""\r
+        return self.sync_request("handle_execute", expr, mode)\r
+\r
+    #\r
+    # handlers layer\r
+    #\r
+    def handle_incref(self, oid):\r
+        self.box.incref(oid)\r
+    \r
+    def handle_decref(self, oid):\r
+        self.box.decref(oid)\r
+            \r
+    def handle_delattr(self, oid, name):\r
+        delattr(self.box[oid], name)\r
+\r
+    def handle_getattr(self, oid, name):\r
+        return getattr(self.box[oid], name)\r
+\r
+    def handle_setattr(self, oid, name, value):\r
+        setattr(self.box[oid], name, value)\r
+\r
+    def handle_delitem(self, oid, index):\r
+        del self.box[oid][index]\r
+\r
+    def handle_getitem(self, oid, index):\r
+        return self.box[oid][index]\r
+\r
+    def handle_setitem(self, oid, index, value):\r
+        self.box[oid][index] = value\r
+\r
+    def handle_call(self, oid, args, kwargs):\r
+        return self.box[oid](*args, **kwargs)\r
+\r
+    def handle_repr(self, oid):\r
+        return repr(self.box[oid])\r
+\r
+    def handle_str(self, oid):\r
+        return str(self.box[oid])\r
+\r
+    def handle_bool(self, oid):\r
+        return bool(self.box[oid])\r
+\r
+    def handle_import(self, modulename):\r
+        return __import__(modulename, None, None, modulename.split(".")[-1])\r
+\r
+    def handle_getconn(self):\r
+        return self\r
+\r
+    def handle_execute(self, expr, mode):\r
+        codeobj = compile(expr, "<from %s>" % (self,), mode)\r
+        return eval(codeobj, self._local_namespace)\r
+\r
+\r
+\r