pull in additional changes from 2.0 branch.
[monitor.git] / monitor / Rpyc / AsyncNetProxy.py
diff --git a/monitor/Rpyc/AsyncNetProxy.py b/monitor/Rpyc/AsyncNetProxy.py
new file mode 100644 (file)
index 0000000..0c1fc05
--- /dev/null
@@ -0,0 +1,82 @@
+from NetProxy import NetProxyWrapper\r
+from Lib import raise_exception\r
+\r
+\r
+class InvalidAsyncResultState(Exception):\r
+    pass\r
+\r
+\r
+class AsyncNetProxy(NetProxyWrapper):\r
+    """wraps an exiting synchronous netproxy to make is asynchronous \r
+    (remote operations return AsyncResult objects)"""\r
+    __doc__ = NetProxyWrapper.__doc__\r
+\r
+    def __request__(self, handler, *args):\r
+        res = AsyncResult(self.__dict__["____conn"])\r
+        self.__dict__["____conn"].async_request(res.callback, handler, self.__dict__["____oid"], *args)\r
+        return res\r
+\r
+    # must return a string... and it's not meaningful to return the repr of an async result\r
+    def __repr__(self, *args):\r
+        return self.__request__("handle_repr", *args).result\r
+    def __str__(self, *args):\r
+        return self.__request__("handle_str", *args).result      \r
+        \r
+\r
+class AsyncResult(object):\r
+    """represents the result of an asynchronous operation"""\r
+    STATE_PENDING = "pending"\r
+    STATE_READY = "ready"\r
+    STATE_EXCEPTION = "exception"\r
+    STATE_UNKNOWN = "unknown"\r
+    \r
+    def __init__(self, conn):\r
+        self.conn = conn\r
+        self._state = self.STATE_PENDING\r
+        self._result = None\r
+        self._on_ready = None\r
+    \r
+    def __repr__(self):\r
+        return "<AsyncResult (%s) at 0x%08x>" % (self._state, id(self))\r
+    \r
+    def callback(self, event, obj):\r
+        if event == "result":\r
+            self._state = self.STATE_READY\r
+            self._result = obj\r
+        elif event == "exception":\r
+            self._state = self.STATE_EXCEPTION\r
+            self._result = obj\r
+        else:\r
+            self._state = self.STATE_UNKNOWN\r
+            self._result = obj\r
+            \r
+        if self._on_ready is not None:\r
+            self._on_ready(self)\r
+    \r
+    def _get_on_ready(self):\r
+        return self._ready_callback\r
+\r
+    def _set_on_ready(self, obj):\r
+        self._on_ready = obj\r
+        if self._state != self.STATE_PENDING:\r
+            self._on_ready(self)\r
+    \r
+    def _get_is_ready(self):\r
+        if self._state == self.STATE_PENDING:\r
+            self.conn.poll()\r
+        return self._state != self.STATE_PENDING\r
+    \r
+    def _get_result(self):\r
+        while self._state == self.STATE_PENDING:\r
+            self.conn.serve()\r
+        if self._state == self.STATE_READY:\r
+            return self._result\r
+        elif self._state == self.STATE_EXCEPTION:\r
+            raise_exception(*self._result)\r
+        else:\r
+            raise InvalidAsyncResultState(self._state)\r
+            \r
+    is_ready = property(_get_is_ready, doc = "indicates whether or not the result is ready")\r
+    result = property(_get_result, doc = "the value of the async result (may block)")\r
+    on_ready = property(_get_on_ready, _set_on_ready, doc =\r
+        "if not None, specifies a callback which is called when the result is ready")\r