clearer names for actions, and infer actions better
[monitor.git] / monitor / Rpyc / Connection.py
1 import sys\r
2 from Boxing import Box, dump_exception, load_exception\r
3 from ModuleNetProxy import RootImporter\r
4 from Lib import raise_exception, AttrFrontend\r
5 \r
6 \r
7 class Connection(object):\r
8     """\r
9     the rpyc connection layer (protocol and APIs). generally speaking, the only \r
10     things you'll need to access directly from this object are:\r
11      * modules - represents the remote python interprerer's modules namespace\r
12      * execute - executes the given code on the other side of the connection\r
13      * namespace - the namespace in which the code you `execute` resides\r
14 \r
15     the rest of the attributes should be of no intresent to you, except maybe for \r
16     `remote_conn`, which represents the other side of the connection. it is unlikely,\r
17     however, you'll need to use it (it is used interally).\r
18     \r
19     when you are done using a connection, and wish to release the resources it\r
20     uses, you should call close(). you don't have to, but if you don't, the gc\r
21     can't release the memory because of cyclic references.\r
22     """\r
23     \r
24     def __init__(self, channel):\r
25         self._closed = False\r
26         self._local_namespace = {}\r
27         self.channel = channel\r
28         self.box = Box(self)\r
29         self.async_replies = {}\r
30         self.sync_replies = {}\r
31         self.request_seq = 0\r
32         self.module_cache = {}\r
33         # user APIs:\r
34         self.modules = RootImporter(self)\r
35         self.remote_conn = self.sync_request("handle_getconn")\r
36         self.namespace = AttrFrontend(self.remote_conn._local_namespace)\r
37     \r
38     def __repr__(self):\r
39         if self._closed:\r
40             return "<%s - closed>" % (self.__class__.__name__,)\r
41         else:\r
42             return "<%s(%r)>" % (self.__class__.__name__, self.channel)\r
43 \r
44     # \r
45     # file api layer\r
46     #\r
47     def close(self):\r
48         if self._closed:\r
49             return\r
50         self._closed = True\r
51         self.box.close()\r
52         self.channel.close()\r
53         # untangle circular references\r
54         del self._local_namespace\r
55         del self.channel\r
56         del self.box\r
57         del self.async_replies\r
58         del self.sync_replies\r
59         del self.request_seq\r
60         del self.module_cache\r
61         del self.modules\r
62         del self.remote_conn\r
63         del self.namespace\r
64     \r
65     def fileno(self):\r
66         return self.channel.fileno()\r
67 \r
68     #\r
69     # protocol layer\r
70     #\r
71     def _recv(self):\r
72         return self.box.unpack(self.channel.recv())\r
73 \r
74     def _send(self, *args):\r
75         return self.channel.send(self.box.pack(args))\r
76     \r
77     def send_request(self, handlername, *args):\r
78         try:\r
79             self.channel.lock.acquire()\r
80             # this must be atomic {\r
81             self.request_seq += 1 \r
82             self._send(handlername, self.request_seq, args)\r
83             return self.request_seq\r
84             # }\r
85         finally:\r
86             self.channel.lock.release()\r
87 \r
88     def send_exception(self, seq, exc_info):\r
89         self._send("exception", seq, dump_exception(*exc_info))\r
90 \r
91     def send_result(self, seq, obj):\r
92         self._send("result", seq, obj)\r
93 \r
94     def dispatch_result(self, seq, obj):\r
95         if seq in self.async_replies:\r
96             self.async_replies.pop(seq)("result", obj)\r
97         else:        \r
98             self.sync_replies[seq] = obj\r
99     \r
100     def dispatch_exception(self, seq, obj):\r
101         excobj = load_exception(obj)\r
102         if seq in self.async_replies:\r
103             self.async_replies.pop(seq)("exception", excobj)\r
104         else:\r
105             raise_exception(*excobj)\r
106 \r
107     def dispatch_request(self, handlername, seq, args):\r
108         try:\r
109             res = getattr(self, handlername)(*args)\r
110         except SystemExit:\r
111             raise\r
112         except:\r
113             self.send_exception(seq, sys.exc_info())\r
114         else:\r
115             self.send_result(seq, res)\r
116 \r
117     def sync_request(self, *args):\r
118         """performs a synchronous (blocking) request"""\r
119         seq = self.send_request(*args)\r
120         while seq not in self.sync_replies:\r
121             self.serve()\r
122         return self.sync_replies.pop(seq)\r
123     \r
124     def async_request(self, callback, *args):\r
125         """performs an asynchronous (non-blocking) request"""\r
126         seq = self.send_request(*args)\r
127         self.async_replies[seq] = callback\r
128         \r
129     #\r
130     # servers api\r
131     #\r
132     def poll(self):\r
133         """if available, serves a single request, otherwise returns (non-blocking serve)"""\r
134         if self.channel.is_available():\r
135             self.serve()\r
136             return True\r
137         else:\r
138             return False\r
139     \r
140     def serve(self):\r
141         """serves a single request or reply (may block)"""\r
142         self.channel.wait()\r
143         handler, seq, obj = self._recv()\r
144         if handler == "result":\r
145             self.dispatch_result(seq, obj)\r
146         elif handler == "exception":\r
147             self.dispatch_exception(seq, obj)\r
148         else:\r
149             self.dispatch_request(handler, seq, obj)            \r
150 \r
151     #\r
152     # root requests\r
153     #\r
154     def rimport(self, modulename):\r
155         """imports a module by name (as a string)"""\r
156         if modulename not in self.module_cache:\r
157             module = self.sync_request("handle_import", modulename)\r
158             self.module_cache[modulename] = module\r
159         return self.module_cache[modulename]            \r
160 \r
161     def execute(self, expr, mode = "exec"):\r
162         """execute the given code at the remote side of the connection"""\r
163         return self.sync_request("handle_execute", expr, mode)\r
164 \r
165     #\r
166     # handlers layer\r
167     #\r
168     def handle_incref(self, oid):\r
169         self.box.incref(oid)\r
170     \r
171     def handle_decref(self, oid):\r
172         self.box.decref(oid)\r
173             \r
174     def handle_delattr(self, oid, name):\r
175         delattr(self.box[oid], name)\r
176 \r
177     def handle_getattr(self, oid, name):\r
178         return getattr(self.box[oid], name)\r
179 \r
180     def handle_setattr(self, oid, name, value):\r
181         setattr(self.box[oid], name, value)\r
182 \r
183     def handle_delitem(self, oid, index):\r
184         del self.box[oid][index]\r
185 \r
186     def handle_getitem(self, oid, index):\r
187         return self.box[oid][index]\r
188 \r
189     def handle_setitem(self, oid, index, value):\r
190         self.box[oid][index] = value\r
191 \r
192     def handle_call(self, oid, args, kwargs):\r
193         return self.box[oid](*args, **kwargs)\r
194 \r
195     def handle_repr(self, oid):\r
196         return repr(self.box[oid])\r
197 \r
198     def handle_str(self, oid):\r
199         return str(self.box[oid])\r
200 \r
201     def handle_bool(self, oid):\r
202         return bool(self.box[oid])\r
203 \r
204     def handle_import(self, modulename):\r
205         return __import__(modulename, None, None, modulename.split(".")[-1])\r
206 \r
207     def handle_getconn(self):\r
208         return self\r
209 \r
210     def handle_execute(self, expr, mode):\r
211         codeobj = compile(expr, "<from %s>" % (self,), mode)\r
212         return eval(codeobj, self._local_namespace)\r
213 \r
214 \r
215 \r