Allow command pipelining for more efficient communication.
return testbed.get(guid, name, time)
raise RuntimeError("No element exists with guid %d" % guid)
+ def get_deferred(self, guid, name, time = TIME_NOW):
+ testbed = self._testbed_for_guid(guid)
+ if testbed != None:
+ return testbed.get_deferred(guid, name, time)
+ raise RuntimeError("No element exists with guid %d" % guid)
+
def get_factory_id(self, guid):
testbed = self._testbed_for_guid(guid)
if testbed != None:
--- /dev/null
+class Defer:
+ class NONE:
+ pass
+
+ def __init__(self, ojetwait):
+ self.__ojet = Defer.NONE
+ self.__ojetwait = ojetwait
+ def __getattr__(self, attr):
+ if attr in ('_Defer__ojet', '_Defer__ojetwait', '_get'):
+ try:
+ return self.__dict__[attr]
+ except KeyError:
+ raise AttributeError, attr
+ else:
+ if self.__ojet is Defer.NONE:
+ self.__ojet = self.__ojetwait()
+ return getattr(self.__ojet, attr)
+ def __setattr__(self, attr, value):
+ if attr in ('_Defer__ojet', '_Defer__ojetwait'):
+ self.__dict__[attr] = value
+ else:
+ if self.__ojet is Defer.NONE:
+ self.__ojet = self.__ojetwait()
+ self.__ojetwait = None
+ return setattr(self.__ojet, attr, value)
+ def _get(self):
+ if self.__ojet is Defer.NONE:
+ self.__ojet = self.__ojetwait()
+ return self.__ojet
+
func_template = func_template_file.read()
func_template_file.close()
- for methname in vars(template_class):
+ for methname in vars(template_class).copy():
+ if methname.endswith('_deferred'):
+ # cannot wrap deferreds...
+ continue
+ dmethname = methname+'_deferred'
if hasattr(server_class, methname) and not methname.startswith('_'):
template_meth = getattr(template_class, methname)
server_meth = getattr(server_class, methname)
argtypes = argtypes,
argencoders = argencoders,
rvtype = rvtype,
+ functools = functools,
)
context = dict()
if doprop:
rv[methname] = property(context[methname])
+ rv[dmethname] = property(context[dmethname])
else:
rv[methname] = context[methname]
+ rv[dmethname] = context[dmethname]
+
+ # inject _deferred into core classes
+ if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
+ setattr(template_class, dmethname,
+ getattr(template_class, methname))
return rv
reply)
return rv
+def %(methname)s_deferred(%(self)s, %(argdefs)s):
+ msg = BaseProxy._make_message(
+ argtypes,
+ argencoders,
+ %(command)d,
+ %(methname)r,
+ %(classname)r,
+ %(args)s)
+ %(self)s._client.send_msg(msg)
+ rv = %(self)s._client.defer_reply(
+ transform = functools.partial(
+ BaseProxy._parse_reply,
+ rvtype,
+ %(methname)r+'_deferred',
+ %(classname)r)
+ )
+ return rv
+
import signal
import re
import tempfile
+import defer
+import functools
+import collections
CTRL_SOCK = "ctrl.sock"
STD_ERR = "stderr.log"
self._stop = False
self._ctrl_sock = None
self._log_level = log_level
+ self._rdbuf = ""
def run(self):
try:
self.log_error()
def recv_msg(self, conn):
- data = ""
- while True:
+ data = [self._rdbuf]
+ chunk = data[0]
+ while '\n' not in chunk:
try:
chunk = conn.recv(1024)
except OSError, e:
if chunk == '':
continue
if chunk:
- data += chunk
- if chunk[-1] == "\n":
- break
+ data.append(chunk)
else:
# empty chunk = EOF
break
+ data = ''.join(data).split('\n',1)
+ while len(data) < 2:
+ data.append('')
+ data, self._rdbuf = data
+
decoded = base64.b64decode(data)
return decoded.rstrip()
self._ctrl_sock = None
self._root_dir = root_dir
self._stop = False
+ self._rdbuf = ""
def forward(self):
self.connect()
self._stop = True
def recv_from_server(self):
- data = ""
- while True:
+ data = [self._rdbuf]
+ chunk = data[0]
+ while '\n' not in chunk:
try:
chunk = self._ctrl_sock.recv(1024)
except OSError, e:
raise
if chunk == '':
continue
- data += chunk
- if chunk[-1] == "\n":
+ if chunk:
+ data.append(chunk)
+ else:
+ # empty chunk = EOF
break
- return data
+ data = ''.join(data).split('\n',1)
+ while len(data) < 2:
+ data.append('')
+ data, self._rdbuf = data
+
+ return data+'\n'
def connect(self):
self.disconnect()
self.agent = agent
self.environment_setup = environment_setup
self._stopped = False
+ self._deferreds = collections.deque()
self.connect()
def __del__(self):
self.send_msg(STOP_MSG)
self._stopped = True
- def read_reply(self):
+ def defer_reply(self, transform=None):
+ defer_entry = []
+ self._deferreds.append(defer_entry)
+ return defer.Defer(
+ functools.partial(self.read_reply, defer_entry, transform)
+ )
+
+ def _read_reply(self):
data = self._process.stdout.readline()
encoded = data.rstrip()
return base64.b64decode(encoded)
+
+ def read_reply(self, which=None, transform=None):
+ # Test to see if someone did it already
+ if which is not None and len(which):
+ # Ok, they did it...
+ # ...just return the deferred value
+ if transform:
+ return transform(which[0])
+ else:
+ return which[0]
+
+ # Process all deferreds until the one we're looking for
+ # or until the queue is empty
+ while self._deferreds:
+ try:
+ deferred = self._deferreds.popleft()
+ except IndexError:
+ # emptied
+ break
+
+ deferred.append(self._read_reply())
+ if deferred is which:
+ # We reached the one we were looking for
+ if transform:
+ return transform(deferred[0])
+ else:
+ return deferred[0]
+
+ if which is None:
+ # They've requested a synchronous read
+ if transform:
+ return transform(self._read_reply())
+ else:
+ return self._read_reply()
def _make_server_key_args(server_key, host, port, args):
"""