Deferred versions of proxied methods automatically injected that return deferred...
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 28 Jun 2011 15:46:40 +0000 (17:46 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 28 Jun 2011 15:46:40 +0000 (17:46 +0200)
Allow command pipelining for more efficient communication.

src/nepi/core/execute.py
src/nepi/util/defer.py [new file with mode: 0644]
src/nepi/util/proxy.py
src/nepi/util/proxy_stub.tpl
src/nepi/util/server.py

index ecd49c9..f8d1a9a 100644 (file)
@@ -612,6 +612,12 @@ class ExperimentController(object):
             return testbed.get(guid, name, time)
         raise RuntimeError("No element exists with guid %d" % guid)    
 
+    def get_deferred(self, guid, name, time = TIME_NOW):
+        testbed = self._testbed_for_guid(guid)
+        if testbed != None:
+            return testbed.get_deferred(guid, name, time)
+        raise RuntimeError("No element exists with guid %d" % guid)    
+
     def get_factory_id(self, guid):
         testbed = self._testbed_for_guid(guid)
         if testbed != None:
diff --git a/src/nepi/util/defer.py b/src/nepi/util/defer.py
new file mode 100644 (file)
index 0000000..732d023
--- /dev/null
@@ -0,0 +1,30 @@
+class Defer:
+    class NONE:
+        pass
+    
+    def __init__(self, ojetwait):
+        self.__ojet = Defer.NONE
+        self.__ojetwait = ojetwait
+    def __getattr__(self, attr):
+        if attr in ('_Defer__ojet', '_Defer__ojetwait', '_get'):
+            try:
+                return self.__dict__[attr]
+            except KeyError:
+                raise AttributeError, attr
+        else:
+            if self.__ojet is Defer.NONE:
+                self.__ojet = self.__ojetwait()
+            return getattr(self.__ojet, attr)
+    def __setattr__(self, attr, value):
+        if attr in ('_Defer__ojet', '_Defer__ojetwait'):
+            self.__dict__[attr] = value
+        else:
+            if self.__ojet is Defer.NONE:
+                self.__ojet = self.__ojetwait()
+                self.__ojetwait = None
+            return setattr(self.__ojet, attr, value)
+    def _get(self):
+        if self.__ojet is Defer.NONE:
+            self.__ojet = self.__ojetwait()
+        return self.__ojet
+
index b13c8d2..9dcb5c9 100644 (file)
@@ -902,7 +902,11 @@ class BaseProxy(object):
         func_template = func_template_file.read()
         func_template_file.close()
         
-        for methname in vars(template_class):
+        for methname in vars(template_class).copy():
+            if methname.endswith('_deferred'):
+                # cannot wrap deferreds...
+                continue
+            dmethname = methname+'_deferred'
             if hasattr(server_class, methname) and not methname.startswith('_'):
                 template_meth = getattr(template_class, methname)
                 server_meth = getattr(server_class, methname)
@@ -930,6 +934,7 @@ class BaseProxy(object):
                         argtypes = argtypes,
                         argencoders = argencoders,
                         rvtype = rvtype,
+                        functools = functools,
                     )
                     context = dict()
                     
@@ -955,8 +960,15 @@ class BaseProxy(object):
                     
                     if doprop:
                         rv[methname] = property(context[methname])
+                        rv[dmethname] = property(context[dmethname])
                     else:
                         rv[methname] = context[methname]
+                        rv[dmethname] = context[dmethname]
+                    
+                    # inject _deferred into core classes
+                    if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
+                        setattr(template_class, dmethname, 
+                            getattr(template_class, methname))
         
         return rv
                         
index a4881f9..7161c6e 100644 (file)
@@ -15,3 +15,21 @@ def %(methname)s(%(self)s, %(argdefs)s):
         reply)
     return rv
 
+def %(methname)s_deferred(%(self)s, %(argdefs)s):
+    msg = BaseProxy._make_message(
+        argtypes,
+        argencoders,
+        %(command)d,
+        %(methname)r,
+        %(classname)r,
+        %(args)s)
+    %(self)s._client.send_msg(msg)
+    rv = %(self)s._client.defer_reply(
+        transform = functools.partial(
+            BaseProxy._parse_reply,
+            rvtype,
+            %(methname)r+'_deferred',
+            %(classname)r)
+        )
+    return rv
+
index d6079af..7f5a006 100644 (file)
@@ -16,6 +16,9 @@ import traceback
 import signal
 import re
 import tempfile
+import defer
+import functools
+import collections
 
 CTRL_SOCK = "ctrl.sock"
 STD_ERR = "stderr.log"
@@ -72,6 +75,7 @@ class Server(object):
         self._stop = False
         self._ctrl_sock = None
         self._log_level = log_level
+        self._rdbuf = ""
 
     def run(self):
         try:
@@ -195,8 +199,9 @@ class Server(object):
                 self.log_error()
 
     def recv_msg(self, conn):
-        data = ""
-        while True:
+        data = [self._rdbuf]
+        chunk = data[0]
+        while '\n' not in chunk:
             try:
                 chunk = conn.recv(1024)
             except OSError, e:
@@ -205,12 +210,15 @@ class Server(object):
                 if chunk == '':
                     continue
             if chunk:
-                data += chunk
-                if chunk[-1] == "\n":
-                    break
+                data.append(chunk)
             else:
                 # empty chunk = EOF
                 break
+        data = ''.join(data).split('\n',1)
+        while len(data) < 2:
+            data.append('')
+        data, self._rdbuf = data
+        
         decoded = base64.b64decode(data)
         return decoded.rstrip()
 
@@ -250,6 +258,7 @@ class Forwarder(object):
         self._ctrl_sock = None
         self._root_dir = root_dir
         self._stop = False
+        self._rdbuf = ""
 
     def forward(self):
         self.connect()
@@ -284,8 +293,9 @@ class Forwarder(object):
             self._stop = True
 
     def recv_from_server(self):
-        data = ""
-        while True:
+        data = [self._rdbuf]
+        chunk = data[0]
+        while '\n' not in chunk:
             try:
                 chunk = self._ctrl_sock.recv(1024)
             except OSError, e:
@@ -293,10 +303,17 @@ class Forwarder(object):
                     raise
                 if chunk == '':
                     continue
-            data += chunk
-            if chunk[-1] == "\n":
+            if chunk:
+                data.append(chunk)
+            else:
+                # empty chunk = EOF
                 break
-        return data
+        data = ''.join(data).split('\n',1)
+        while len(data) < 2:
+            data.append('')
+        data, self._rdbuf = data
+        
+        return data+'\n'
  
     def connect(self):
         self.disconnect()
@@ -319,6 +336,7 @@ class Client(object):
         self.agent = agent
         self.environment_setup = environment_setup
         self._stopped = False
+        self._deferreds = collections.deque()
         self.connect()
     
     def __del__(self):
@@ -377,10 +395,51 @@ class Client(object):
         self.send_msg(STOP_MSG)
         self._stopped = True
 
-    def read_reply(self):
+    def defer_reply(self, transform=None):
+        defer_entry = []
+        self._deferreds.append(defer_entry)
+        return defer.Defer(
+            functools.partial(self.read_reply, defer_entry, transform)
+        )
+        
+    def _read_reply(self):
         data = self._process.stdout.readline()
         encoded = data.rstrip() 
         return base64.b64decode(encoded)
+    
+    def read_reply(self, which=None, transform=None):
+        # Test to see if someone did it already
+        if which is not None and len(which):
+            # Ok, they did it...
+            # ...just return the deferred value
+            if transform:
+                return transform(which[0])
+            else:
+                return which[0]
+        
+        # Process all deferreds until the one we're looking for
+        # or until the queue is empty
+        while self._deferreds:
+            try:
+                deferred = self._deferreds.popleft()
+            except IndexError:
+                # emptied
+                break
+            
+            deferred.append(self._read_reply())
+            if deferred is which:
+                # We reached the one we were looking for
+                if transform:
+                    return transform(deferred[0])
+                else:
+                    return deferred[0]
+        
+        if which is None:
+            # They've requested a synchronous read
+            if transform:
+                return transform(self._read_reply())
+            else:
+                return self._read_reply()
 
 def _make_server_key_args(server_key, host, port, args):
     """