--- /dev/null
+import sys\r
+from Boxing import Box, dump_exception, load_exception\r
+from ModuleNetProxy import RootImporter\r
+from Lib import raise_exception, AttrFrontend\r
+\r
+\r
+class Connection(object):\r
+ """\r
+ the rpyc connection layer (protocol and APIs). generally speaking, the only \r
+ things you'll need to access directly from this object are:\r
+ * modules - represents the remote python interprerer's modules namespace\r
+ * execute - executes the given code on the other side of the connection\r
+ * namespace - the namespace in which the code you `execute` resides\r
+\r
+ the rest of the attributes should be of no intresent to you, except maybe for \r
+ `remote_conn`, which represents the other side of the connection. it is unlikely,\r
+ however, you'll need to use it (it is used interally).\r
+ \r
+ when you are done using a connection, and wish to release the resources it\r
+ uses, you should call close(). you don't have to, but if you don't, the gc\r
+ can't release the memory because of cyclic references.\r
+ """\r
+ \r
+ def __init__(self, channel):\r
+ self._closed = False\r
+ self._local_namespace = {}\r
+ self.channel = channel\r
+ self.box = Box(self)\r
+ self.async_replies = {}\r
+ self.sync_replies = {}\r
+ self.request_seq = 0\r
+ self.module_cache = {}\r
+ # user APIs:\r
+ self.modules = RootImporter(self)\r
+ self.remote_conn = self.sync_request("handle_getconn")\r
+ self.namespace = AttrFrontend(self.remote_conn._local_namespace)\r
+ \r
+ def __repr__(self):\r
+ if self._closed:\r
+ return "<%s - closed>" % (self.__class__.__name__,)\r
+ else:\r
+ return "<%s(%r)>" % (self.__class__.__name__, self.channel)\r
+\r
+ # \r
+ # file api layer\r
+ #\r
+ def close(self):\r
+ if self._closed:\r
+ return\r
+ self._closed = True\r
+ self.box.close()\r
+ self.channel.close()\r
+ # untangle circular references\r
+ del self._local_namespace\r
+ del self.channel\r
+ del self.box\r
+ del self.async_replies\r
+ del self.sync_replies\r
+ del self.request_seq\r
+ del self.module_cache\r
+ del self.modules\r
+ del self.remote_conn\r
+ del self.namespace\r
+ \r
+ def fileno(self):\r
+ return self.channel.fileno()\r
+\r
+ #\r
+ # protocol layer\r
+ #\r
+ def _recv(self):\r
+ return self.box.unpack(self.channel.recv())\r
+\r
+ def _send(self, *args):\r
+ return self.channel.send(self.box.pack(args))\r
+ \r
+ def send_request(self, handlername, *args):\r
+ try:\r
+ self.channel.lock.acquire()\r
+ # this must be atomic {\r
+ self.request_seq += 1 \r
+ self._send(handlername, self.request_seq, args)\r
+ return self.request_seq\r
+ # }\r
+ finally:\r
+ self.channel.lock.release()\r
+\r
+ def send_exception(self, seq, exc_info):\r
+ self._send("exception", seq, dump_exception(*exc_info))\r
+\r
+ def send_result(self, seq, obj):\r
+ self._send("result", seq, obj)\r
+\r
+ def dispatch_result(self, seq, obj):\r
+ if seq in self.async_replies:\r
+ self.async_replies.pop(seq)("result", obj)\r
+ else: \r
+ self.sync_replies[seq] = obj\r
+ \r
+ def dispatch_exception(self, seq, obj):\r
+ excobj = load_exception(obj)\r
+ if seq in self.async_replies:\r
+ self.async_replies.pop(seq)("exception", excobj)\r
+ else:\r
+ raise_exception(*excobj)\r
+\r
+ def dispatch_request(self, handlername, seq, args):\r
+ try:\r
+ res = getattr(self, handlername)(*args)\r
+ except SystemExit:\r
+ raise\r
+ except:\r
+ self.send_exception(seq, sys.exc_info())\r
+ else:\r
+ self.send_result(seq, res)\r
+\r
+ def sync_request(self, *args):\r
+ """performs a synchronous (blocking) request"""\r
+ seq = self.send_request(*args)\r
+ while seq not in self.sync_replies:\r
+ self.serve()\r
+ return self.sync_replies.pop(seq)\r
+ \r
+ def async_request(self, callback, *args):\r
+ """performs an asynchronous (non-blocking) request"""\r
+ seq = self.send_request(*args)\r
+ self.async_replies[seq] = callback\r
+ \r
+ #\r
+ # servers api\r
+ #\r
+ def poll(self):\r
+ """if available, serves a single request, otherwise returns (non-blocking serve)"""\r
+ if self.channel.is_available():\r
+ self.serve()\r
+ return True\r
+ else:\r
+ return False\r
+ \r
+ def serve(self):\r
+ """serves a single request or reply (may block)"""\r
+ self.channel.wait()\r
+ handler, seq, obj = self._recv()\r
+ if handler == "result":\r
+ self.dispatch_result(seq, obj)\r
+ elif handler == "exception":\r
+ self.dispatch_exception(seq, obj)\r
+ else:\r
+ self.dispatch_request(handler, seq, obj) \r
+\r
+ #\r
+ # root requests\r
+ #\r
+ def rimport(self, modulename):\r
+ """imports a module by name (as a string)"""\r
+ if modulename not in self.module_cache:\r
+ module = self.sync_request("handle_import", modulename)\r
+ self.module_cache[modulename] = module\r
+ return self.module_cache[modulename] \r
+\r
+ def execute(self, expr, mode = "exec"):\r
+ """execute the given code at the remote side of the connection"""\r
+ return self.sync_request("handle_execute", expr, mode)\r
+\r
+ #\r
+ # handlers layer\r
+ #\r
+ def handle_incref(self, oid):\r
+ self.box.incref(oid)\r
+ \r
+ def handle_decref(self, oid):\r
+ self.box.decref(oid)\r
+ \r
+ def handle_delattr(self, oid, name):\r
+ delattr(self.box[oid], name)\r
+\r
+ def handle_getattr(self, oid, name):\r
+ return getattr(self.box[oid], name)\r
+\r
+ def handle_setattr(self, oid, name, value):\r
+ setattr(self.box[oid], name, value)\r
+\r
+ def handle_delitem(self, oid, index):\r
+ del self.box[oid][index]\r
+\r
+ def handle_getitem(self, oid, index):\r
+ return self.box[oid][index]\r
+\r
+ def handle_setitem(self, oid, index, value):\r
+ self.box[oid][index] = value\r
+\r
+ def handle_call(self, oid, args, kwargs):\r
+ return self.box[oid](*args, **kwargs)\r
+\r
+ def handle_repr(self, oid):\r
+ return repr(self.box[oid])\r
+\r
+ def handle_str(self, oid):\r
+ return str(self.box[oid])\r
+\r
+ def handle_bool(self, oid):\r
+ return bool(self.box[oid])\r
+\r
+ def handle_import(self, modulename):\r
+ return __import__(modulename, None, None, modulename.split(".")[-1])\r
+\r
+ def handle_getconn(self):\r
+ return self\r
+\r
+ def handle_execute(self, expr, mode):\r
+ codeobj = compile(expr, "<from %s>" % (self,), mode)\r
+ return eval(codeobj, self._local_namespace)\r
+\r
+\r
+\r