+++ /dev/null
-import sys\r
-from Boxing import Box, dump_exception, load_exception\r
-from ModuleNetProxy import RootImporter\r
-from Lib import raise_exception, AttrFrontend\r
-\r
-\r
-class Connection(object):\r
- """\r
- the rpyc connection layer (protocol and APIs). generally speaking, the only \r
- things you'll need to access directly from this object are:\r
- * modules - represents the remote python interprerer's modules namespace\r
- * execute - executes the given code on the other side of the connection\r
- * namespace - the namespace in which the code you `execute` resides\r
-\r
- the rest of the attributes should be of no intresent to you, except maybe for \r
- `remote_conn`, which represents the other side of the connection. it is unlikely,\r
- however, you'll need to use it (it is used interally).\r
- \r
- when you are done using a connection, and wish to release the resources it\r
- uses, you should call close(). you don't have to, but if you don't, the gc\r
- can't release the memory because of cyclic references.\r
- """\r
- \r
- def __init__(self, channel):\r
- self._closed = False\r
- self._local_namespace = {}\r
- self.channel = channel\r
- self.box = Box(self)\r
- self.async_replies = {}\r
- self.sync_replies = {}\r
- self.request_seq = 0\r
- self.module_cache = {}\r
- # user APIs:\r
- self.modules = RootImporter(self)\r
- self.remote_conn = self.sync_request("handle_getconn")\r
- self.namespace = AttrFrontend(self.remote_conn._local_namespace)\r
- \r
- def __repr__(self):\r
- if self._closed:\r
- return "<%s - closed>" % (self.__class__.__name__,)\r
- else:\r
- return "<%s(%r)>" % (self.__class__.__name__, self.channel)\r
-\r
- # \r
- # file api layer\r
- #\r
- def close(self):\r
- if self._closed:\r
- return\r
- self._closed = True\r
- self.box.close()\r
- self.channel.close()\r
- # untangle circular references\r
- del self._local_namespace\r
- del self.channel\r
- del self.box\r
- del self.async_replies\r
- del self.sync_replies\r
- del self.request_seq\r
- del self.module_cache\r
- del self.modules\r
- del self.remote_conn\r
- del self.namespace\r
- \r
- def fileno(self):\r
- return self.channel.fileno()\r
-\r
- #\r
- # protocol layer\r
- #\r
- def _recv(self):\r
- return self.box.unpack(self.channel.recv())\r
-\r
- def _send(self, *args):\r
- return self.channel.send(self.box.pack(args))\r
- \r
- def send_request(self, handlername, *args):\r
- try:\r
- self.channel.lock.acquire()\r
- # this must be atomic {\r
- self.request_seq += 1 \r
- self._send(handlername, self.request_seq, args)\r
- return self.request_seq\r
- # }\r
- finally:\r
- self.channel.lock.release()\r
-\r
- def send_exception(self, seq, exc_info):\r
- self._send("exception", seq, dump_exception(*exc_info))\r
-\r
- def send_result(self, seq, obj):\r
- self._send("result", seq, obj)\r
-\r
- def dispatch_result(self, seq, obj):\r
- if seq in self.async_replies:\r
- self.async_replies.pop(seq)("result", obj)\r
- else: \r
- self.sync_replies[seq] = obj\r
- \r
- def dispatch_exception(self, seq, obj):\r
- excobj = load_exception(obj)\r
- if seq in self.async_replies:\r
- self.async_replies.pop(seq)("exception", excobj)\r
- else:\r
- raise_exception(*excobj)\r
-\r
- def dispatch_request(self, handlername, seq, args):\r
- try:\r
- res = getattr(self, handlername)(*args)\r
- except SystemExit:\r
- raise\r
- except:\r
- self.send_exception(seq, sys.exc_info())\r
- else:\r
- self.send_result(seq, res)\r
-\r
- def sync_request(self, *args):\r
- """performs a synchronous (blocking) request"""\r
- seq = self.send_request(*args)\r
- while seq not in self.sync_replies:\r
- self.serve()\r
- return self.sync_replies.pop(seq)\r
- \r
- def async_request(self, callback, *args):\r
- """performs an asynchronous (non-blocking) request"""\r
- seq = self.send_request(*args)\r
- self.async_replies[seq] = callback\r
- \r
- #\r
- # servers api\r
- #\r
- def poll(self):\r
- """if available, serves a single request, otherwise returns (non-blocking serve)"""\r
- if self.channel.is_available():\r
- self.serve()\r
- return True\r
- else:\r
- return False\r
- \r
- def serve(self):\r
- """serves a single request or reply (may block)"""\r
- self.channel.wait()\r
- handler, seq, obj = self._recv()\r
- if handler == "result":\r
- self.dispatch_result(seq, obj)\r
- elif handler == "exception":\r
- self.dispatch_exception(seq, obj)\r
- else:\r
- self.dispatch_request(handler, seq, obj) \r
-\r
- #\r
- # root requests\r
- #\r
- def rimport(self, modulename):\r
- """imports a module by name (as a string)"""\r
- if modulename not in self.module_cache:\r
- module = self.sync_request("handle_import", modulename)\r
- self.module_cache[modulename] = module\r
- return self.module_cache[modulename] \r
-\r
- def execute(self, expr, mode = "exec"):\r
- """execute the given code at the remote side of the connection"""\r
- return self.sync_request("handle_execute", expr, mode)\r
-\r
- #\r
- # handlers layer\r
- #\r
- def handle_incref(self, oid):\r
- self.box.incref(oid)\r
- \r
- def handle_decref(self, oid):\r
- self.box.decref(oid)\r
- \r
- def handle_delattr(self, oid, name):\r
- delattr(self.box[oid], name)\r
-\r
- def handle_getattr(self, oid, name):\r
- return getattr(self.box[oid], name)\r
-\r
- def handle_setattr(self, oid, name, value):\r
- setattr(self.box[oid], name, value)\r
-\r
- def handle_delitem(self, oid, index):\r
- del self.box[oid][index]\r
-\r
- def handle_getitem(self, oid, index):\r
- return self.box[oid][index]\r
-\r
- def handle_setitem(self, oid, index, value):\r
- self.box[oid][index] = value\r
-\r
- def handle_call(self, oid, args, kwargs):\r
- return self.box[oid](*args, **kwargs)\r
-\r
- def handle_repr(self, oid):\r
- return repr(self.box[oid])\r
-\r
- def handle_str(self, oid):\r
- return str(self.box[oid])\r
-\r
- def handle_bool(self, oid):\r
- return bool(self.box[oid])\r
-\r
- def handle_import(self, modulename):\r
- return __import__(modulename, None, None, modulename.split(".")[-1])\r
-\r
- def handle_getconn(self):\r
- return self\r
-\r
- def handle_execute(self, expr, mode):\r
- codeobj = compile(expr, "<from %s>" % (self,), mode)\r
- return eval(codeobj, self._local_namespace)\r
-\r
-\r
-\r