From b47eae8b19d6e2d9937516a7c1b44a8c43cadab8 Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Tue, 28 Jun 2011 17:46:40 +0200 Subject: [PATCH] Deferred versions of proxied methods automatically injected that return deferred proxies. Allow command pipelining for more efficient communication. --- src/nepi/core/execute.py | 6 +++ src/nepi/util/defer.py | 30 +++++++++++++ src/nepi/util/proxy.py | 14 ++++++- src/nepi/util/proxy_stub.tpl | 18 ++++++++ src/nepi/util/server.py | 81 +++++++++++++++++++++++++++++++----- 5 files changed, 137 insertions(+), 12 deletions(-) create mode 100644 src/nepi/util/defer.py diff --git a/src/nepi/core/execute.py b/src/nepi/core/execute.py index ecd49c92..f8d1a9a5 100644 --- a/src/nepi/core/execute.py +++ b/src/nepi/core/execute.py @@ -612,6 +612,12 @@ class ExperimentController(object): return testbed.get(guid, name, time) raise RuntimeError("No element exists with guid %d" % guid) + def get_deferred(self, guid, name, time = TIME_NOW): + testbed = self._testbed_for_guid(guid) + if testbed != None: + return testbed.get_deferred(guid, name, time) + raise RuntimeError("No element exists with guid %d" % guid) + def get_factory_id(self, guid): testbed = self._testbed_for_guid(guid) if testbed != None: diff --git a/src/nepi/util/defer.py b/src/nepi/util/defer.py new file mode 100644 index 00000000..732d0238 --- /dev/null +++ b/src/nepi/util/defer.py @@ -0,0 +1,30 @@ +class Defer: + class NONE: + pass + + def __init__(self, ojetwait): + self.__ojet = Defer.NONE + self.__ojetwait = ojetwait + def __getattr__(self, attr): + if attr in ('_Defer__ojet', '_Defer__ojetwait', '_get'): + try: + return self.__dict__[attr] + except KeyError: + raise AttributeError, attr + else: + if self.__ojet is Defer.NONE: + self.__ojet = self.__ojetwait() + return getattr(self.__ojet, attr) + def __setattr__(self, attr, value): + if attr in ('_Defer__ojet', '_Defer__ojetwait'): + self.__dict__[attr] = value + else: + if self.__ojet is Defer.NONE: + self.__ojet = self.__ojetwait() + self.__ojetwait = None + return setattr(self.__ojet, attr, value) + def _get(self): + if self.__ojet is Defer.NONE: + self.__ojet = self.__ojetwait() + return self.__ojet + diff --git a/src/nepi/util/proxy.py b/src/nepi/util/proxy.py index b13c8d20..9dcb5c92 100644 --- a/src/nepi/util/proxy.py +++ b/src/nepi/util/proxy.py @@ -902,7 +902,11 @@ class BaseProxy(object): func_template = func_template_file.read() func_template_file.close() - for methname in vars(template_class): + for methname in vars(template_class).copy(): + if methname.endswith('_deferred'): + # cannot wrap deferreds... + continue + dmethname = methname+'_deferred' if hasattr(server_class, methname) and not methname.startswith('_'): template_meth = getattr(template_class, methname) server_meth = getattr(server_class, methname) @@ -930,6 +934,7 @@ class BaseProxy(object): argtypes = argtypes, argencoders = argencoders, rvtype = rvtype, + functools = functools, ) context = dict() @@ -955,8 +960,15 @@ class BaseProxy(object): if doprop: rv[methname] = property(context[methname]) + rv[dmethname] = property(context[dmethname]) else: rv[methname] = context[methname] + rv[dmethname] = context[dmethname] + + # inject _deferred into core classes + if hasattr(template_class, methname) and not hasattr(template_class, dmethname): + setattr(template_class, dmethname, + getattr(template_class, methname)) return rv diff --git a/src/nepi/util/proxy_stub.tpl b/src/nepi/util/proxy_stub.tpl index a4881f97..7161c6ef 100644 --- a/src/nepi/util/proxy_stub.tpl +++ b/src/nepi/util/proxy_stub.tpl @@ -15,3 +15,21 @@ def %(methname)s(%(self)s, %(argdefs)s): reply) return rv +def %(methname)s_deferred(%(self)s, %(argdefs)s): + msg = BaseProxy._make_message( + argtypes, + argencoders, + %(command)d, + %(methname)r, + %(classname)r, + %(args)s) + %(self)s._client.send_msg(msg) + rv = %(self)s._client.defer_reply( + transform = functools.partial( + BaseProxy._parse_reply, + rvtype, + %(methname)r+'_deferred', + %(classname)r) + ) + return rv + diff --git a/src/nepi/util/server.py b/src/nepi/util/server.py index d6079af5..7f5a006e 100644 --- a/src/nepi/util/server.py +++ b/src/nepi/util/server.py @@ -16,6 +16,9 @@ import traceback import signal import re import tempfile +import defer +import functools +import collections CTRL_SOCK = "ctrl.sock" STD_ERR = "stderr.log" @@ -72,6 +75,7 @@ class Server(object): self._stop = False self._ctrl_sock = None self._log_level = log_level + self._rdbuf = "" def run(self): try: @@ -195,8 +199,9 @@ class Server(object): self.log_error() def recv_msg(self, conn): - data = "" - while True: + data = [self._rdbuf] + chunk = data[0] + while '\n' not in chunk: try: chunk = conn.recv(1024) except OSError, e: @@ -205,12 +210,15 @@ class Server(object): if chunk == '': continue if chunk: - data += chunk - if chunk[-1] == "\n": - break + data.append(chunk) else: # empty chunk = EOF break + data = ''.join(data).split('\n',1) + while len(data) < 2: + data.append('') + data, self._rdbuf = data + decoded = base64.b64decode(data) return decoded.rstrip() @@ -250,6 +258,7 @@ class Forwarder(object): self._ctrl_sock = None self._root_dir = root_dir self._stop = False + self._rdbuf = "" def forward(self): self.connect() @@ -284,8 +293,9 @@ class Forwarder(object): self._stop = True def recv_from_server(self): - data = "" - while True: + data = [self._rdbuf] + chunk = data[0] + while '\n' not in chunk: try: chunk = self._ctrl_sock.recv(1024) except OSError, e: @@ -293,10 +303,17 @@ class Forwarder(object): raise if chunk == '': continue - data += chunk - if chunk[-1] == "\n": + if chunk: + data.append(chunk) + else: + # empty chunk = EOF break - return data + data = ''.join(data).split('\n',1) + while len(data) < 2: + data.append('') + data, self._rdbuf = data + + return data+'\n' def connect(self): self.disconnect() @@ -319,6 +336,7 @@ class Client(object): self.agent = agent self.environment_setup = environment_setup self._stopped = False + self._deferreds = collections.deque() self.connect() def __del__(self): @@ -377,10 +395,51 @@ class Client(object): self.send_msg(STOP_MSG) self._stopped = True - def read_reply(self): + def defer_reply(self, transform=None): + defer_entry = [] + self._deferreds.append(defer_entry) + return defer.Defer( + functools.partial(self.read_reply, defer_entry, transform) + ) + + def _read_reply(self): data = self._process.stdout.readline() encoded = data.rstrip() return base64.b64decode(encoded) + + def read_reply(self, which=None, transform=None): + # Test to see if someone did it already + if which is not None and len(which): + # Ok, they did it... + # ...just return the deferred value + if transform: + return transform(which[0]) + else: + return which[0] + + # Process all deferreds until the one we're looking for + # or until the queue is empty + while self._deferreds: + try: + deferred = self._deferreds.popleft() + except IndexError: + # emptied + break + + deferred.append(self._read_reply()) + if deferred is which: + # We reached the one we were looking for + if transform: + return transform(deferred[0]) + else: + return deferred[0] + + if which is None: + # They've requested a synchronous read + if transform: + return transform(self._read_reply()) + else: + return self._read_reply() def _make_server_key_args(server_key, host, port, args): """ -- 2.47.0