2 from Boxing import Box, dump_exception, load_exception
\r
3 from ModuleNetProxy import RootImporter
\r
4 from Lib import raise_exception, AttrFrontend
\r
7 class Connection(object):
\r
9 the rpyc connection layer (protocol and APIs). generally speaking, the only
\r
10 things you'll need to access directly from this object are:
\r
11 * modules - represents the remote python interprerer's modules namespace
\r
12 * execute - executes the given code on the other side of the connection
\r
13 * namespace - the namespace in which the code you `execute` resides
\r
15 the rest of the attributes should be of no intresent to you, except maybe for
\r
16 `remote_conn`, which represents the other side of the connection. it is unlikely,
\r
17 however, you'll need to use it (it is used interally).
\r
19 when you are done using a connection, and wish to release the resources it
\r
20 uses, you should call close(). you don't have to, but if you don't, the gc
\r
21 can't release the memory because of cyclic references.
\r
24 def __init__(self, channel):
\r
25 self._closed = False
\r
26 self._local_namespace = {}
\r
27 self.channel = channel
\r
28 self.box = Box(self)
\r
29 self.async_replies = {}
\r
30 self.sync_replies = {}
\r
31 self.request_seq = 0
\r
32 self.module_cache = {}
\r
34 self.modules = RootImporter(self)
\r
35 self.remote_conn = self.sync_request("handle_getconn")
\r
36 self.namespace = AttrFrontend(self.remote_conn._local_namespace)
\r
40 return "<%s - closed>" % (self.__class__.__name__,)
\r
42 return "<%s(%r)>" % (self.__class__.__name__, self.channel)
\r
52 self.channel.close()
\r
53 # untangle circular references
\r
54 del self._local_namespace
\r
57 del self.async_replies
\r
58 del self.sync_replies
\r
59 del self.request_seq
\r
60 del self.module_cache
\r
62 del self.remote_conn
\r
66 return self.channel.fileno()
\r
72 return self.box.unpack(self.channel.recv())
\r
74 def _send(self, *args):
\r
75 return self.channel.send(self.box.pack(args))
\r
77 def send_request(self, handlername, *args):
\r
79 self.channel.lock.acquire()
\r
80 # this must be atomic {
\r
81 self.request_seq += 1
\r
82 self._send(handlername, self.request_seq, args)
\r
83 return self.request_seq
\r
86 self.channel.lock.release()
\r
88 def send_exception(self, seq, exc_info):
\r
89 self._send("exception", seq, dump_exception(*exc_info))
\r
91 def send_result(self, seq, obj):
\r
92 self._send("result", seq, obj)
\r
94 def dispatch_result(self, seq, obj):
\r
95 if seq in self.async_replies:
\r
96 self.async_replies.pop(seq)("result", obj)
\r
98 self.sync_replies[seq] = obj
\r
100 def dispatch_exception(self, seq, obj):
\r
101 excobj = load_exception(obj)
\r
102 if seq in self.async_replies:
\r
103 self.async_replies.pop(seq)("exception", excobj)
\r
105 raise_exception(*excobj)
\r
107 def dispatch_request(self, handlername, seq, args):
\r
109 res = getattr(self, handlername)(*args)
\r
113 self.send_exception(seq, sys.exc_info())
\r
115 self.send_result(seq, res)
\r
117 def sync_request(self, *args):
\r
118 """performs a synchronous (blocking) request"""
\r
119 seq = self.send_request(*args)
\r
120 while seq not in self.sync_replies:
\r
122 return self.sync_replies.pop(seq)
\r
124 def async_request(self, callback, *args):
\r
125 """performs an asynchronous (non-blocking) request"""
\r
126 seq = self.send_request(*args)
\r
127 self.async_replies[seq] = callback
\r
133 """if available, serves a single request, otherwise returns (non-blocking serve)"""
\r
134 if self.channel.is_available():
\r
141 """serves a single request or reply (may block)"""
\r
142 self.channel.wait()
\r
143 handler, seq, obj = self._recv()
\r
144 if handler == "result":
\r
145 self.dispatch_result(seq, obj)
\r
146 elif handler == "exception":
\r
147 self.dispatch_exception(seq, obj)
\r
149 self.dispatch_request(handler, seq, obj)
\r
154 def rimport(self, modulename):
\r
155 """imports a module by name (as a string)"""
\r
156 if modulename not in self.module_cache:
\r
157 module = self.sync_request("handle_import", modulename)
\r
158 self.module_cache[modulename] = module
\r
159 return self.module_cache[modulename]
\r
161 def execute(self, expr, mode = "exec"):
\r
162 """execute the given code at the remote side of the connection"""
\r
163 return self.sync_request("handle_execute", expr, mode)
\r
168 def handle_incref(self, oid):
\r
169 self.box.incref(oid)
\r
171 def handle_decref(self, oid):
\r
172 self.box.decref(oid)
\r
174 def handle_delattr(self, oid, name):
\r
175 delattr(self.box[oid], name)
\r
177 def handle_getattr(self, oid, name):
\r
178 return getattr(self.box[oid], name)
\r
180 def handle_setattr(self, oid, name, value):
\r
181 setattr(self.box[oid], name, value)
\r
183 def handle_delitem(self, oid, index):
\r
184 del self.box[oid][index]
\r
186 def handle_getitem(self, oid, index):
\r
187 return self.box[oid][index]
\r
189 def handle_setitem(self, oid, index, value):
\r
190 self.box[oid][index] = value
\r
192 def handle_call(self, oid, args, kwargs):
\r
193 return self.box[oid](*args, **kwargs)
\r
195 def handle_repr(self, oid):
\r
196 return repr(self.box[oid])
\r
198 def handle_str(self, oid):
\r
199 return str(self.box[oid])
\r
201 def handle_bool(self, oid):
\r
202 return bool(self.box[oid])
\r
204 def handle_import(self, modulename):
\r
205 return __import__(modulename, None, None, modulename.split(".")[-1])
\r
207 def handle_getconn(self):
\r
210 def handle_execute(self, expr, mode):
\r
211 codeobj = compile(expr, "<from %s>" % (self,), mode)
\r
212 return eval(codeobj, self._local_namespace)
\r