-
- @property
- def guids(self):
- msg = testbed_messages[GUIDS]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- guids = cPickle.loads(text)
- return guids
-
- @property
- def testbed_id(self):
- msg = testbed_messages[TESTBED_ID]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return str(text)
-
- @property
- def testbed_version(self):
- msg = testbed_messages[TESTBED_VERSION]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return str(text)
-
- def defer_configure(self, name, value):
- msg = testbed_messages[CONFIGURE]
- type = get_type(value)
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- value = base64.b64encode(str(value))
- msg = msg % (name, value, type)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_create(self, guid, factory_id):
- msg = testbed_messages[CREATE]
- msg = msg % (guid, factory_id)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_create_set(self, guid, name, value):
- msg = testbed_messages[CREATE_SET]
- type = get_type(value)
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- value = base64.b64encode(str(value))
- msg = msg % (guid, name, value, type)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_factory_set(self, guid, name, value):
- msg = testbed_messages[FACTORY_SET]
- type = get_type(value)
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- value = base64.b64encode(str(value))
- msg = msg % (guid, name, value, type)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_connect(self, guid1, connector_type_name1, guid2,
- connector_type_name2):
- msg = testbed_messages[CONNECT]
- msg = msg % (guid1, connector_type_name1, guid2,
- connector_type_name2)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_cross_connect(self, guid, connector_type_name, cross_guid,
- cross_testbed_guid, cross_testbed_id, cross_factory_id,
- cross_connector_type_name):
- msg = testbed_messages[CROSS_CONNECT]
- msg = msg % (guid, connector_type_name, cross_guid, cross_testbed_guid,
- cross_testbed_id, cross_factory_id, cross_connector_type_name)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_add_trace(self, guid, trace_id):
- msg = testbed_messages[ADD_TRACE]
- msg = msg % (guid, trace_id)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_add_address(self, guid, address, netprefix, broadcast):
- msg = testbed_messages[ADD_ADDRESS]
- msg = msg % (guid, address, netprefix, broadcast)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_add_route(self, guid, destination, netprefix, nexthop):
- msg = testbed_messages[ADD_ROUTE]
- msg = msg % (guid, destination, netprefix, nexthop)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_setup(self):
- msg = testbed_messages[DO_SETUP]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_create(self):
- msg = testbed_messages[DO_CREATE]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_connect_init(self):
- msg = testbed_messages[DO_CONNECT_INIT]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_connect_compl(self):
- msg = testbed_messages[DO_CONNECT_COMPL]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_configure(self):
- msg = testbed_messages[DO_CONFIGURE]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_preconfigure(self):
- msg = testbed_messages[DO_PRECONFIGURE]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_cross_connect_init(self, cross_data):
- msg = testbed_messages[DO_CROSS_CONNECT_INIT]
- pcross_data = cPickle.dumps(cross_data)
- cross_data = base64.b64encode(pcross_data)
- msg = msg % (cross_data)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_cross_connect_compl(self, cross_data):
- msg = testbed_messages[DO_CROSS_CONNECT_COMPL]
- pcross_data = cPickle.dumps(cross_data)
- cross_data = base64.b64encode(pcross_data)
- msg = msg % (cross_data)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def start(self, time = TIME_NOW):
- msg = testbed_messages[START]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def stop(self, time = TIME_NOW):
- msg = testbed_messages[STOP]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def set(self, guid, name, value, time = TIME_NOW):
- msg = testbed_messages[SET]
- type = get_type(value)
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- value = base64.b64encode(str(value))
- msg = msg % (guid, name, value, type, time)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def get(self, guid, name, time = TIME_NOW):
- msg = testbed_messages[GET]
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- msg = msg % (guid, name, time)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
-
- def get_address(self, guid, index, attribute):
- msg = testbed_messages[GET_ADDRESS]
- # avoid having "|" in this parameters
- attribute = base64.b64encode(attribute)
- msg = msg % (guid, index, attribute)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
-
- def get_route(self, guid, index, attribute):
- msg = testbed_messages[GET_ROUTE]
- # avoid having "|" in this parameters
- attribute = base64.b64encode(attribute)
- msg = msg % (guid, index, attribute)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
-
- def action(self, time, guid, action):
- msg = testbed_messages[ACTION]
- msg = msg % (time, guid, action)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def status(self, guid = None):
- msg = testbed_messages[STATUS]
- msg = msg % (guid,)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return int(text)
-
- def trace(self, guid, trace_id, attribute='value'):
- msg = testbed_messages[TRACE]
- attribute = base64.b64encode(attribute)
- msg = msg % (guid, trace_id, attribute)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
-
- def get_attribute_list(self, guid):
- msg = testbed_messages[GET_ATTRIBUTE_LIST]
- msg = msg % (guid,)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- attr_list = cPickle.loads(text)
- return attr_list
-
- def shutdown(self):
- msg = testbed_messages[SHUTDOWN]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
+
+ @staticmethod
+ def _make_message(argtypes, argencoders, command, methname, classname, *args):
+ if len(argtypes) != len(argencoders):
+ raise ValueError, "Invalid arguments for _make_message: "\
+ "in stub method %s of class %s "\
+ "argtypes and argencoders must match in size" % (
+ methname, classname )
+ if len(argtypes) != len(args):
+ raise ValueError, "Invalid arguments for _make_message: "\
+ "in stub method %s of class %s "\
+ "expected %d arguments, got %d" % (
+ methname, classname,
+ len(argtypes), len(args))
+
+ buf = []
+ for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
+ try:
+ buf.append(fmt % encode(val))
+ except:
+ import traceback
+ raise TypeError, "Argument %d of stub method %s of class %s "\
+ "requires a value of type %s, but got %s - nested error: %s" % (
+ argnum, methname, classname,
+ getattr(typ, '__name__', typ), type(val),
+ traceback.format_exc()
+ )
+
+ return "%d|%s" % (command, '|'.join(buf))
+
+ @staticmethod
+ def _parse_reply(rvtype, methname, classname, reply):
+ if not reply:
+ raise RuntimeError, "Invalid reply: %r "\
+ "for stub method %s of class %s" % (
+ reply,
+ methname,
+ classname)
+
+ try:
+ result = reply.split("|")
+ code = int(result[0])
+ text = result[1]
+ except:
+ import traceback
+ raise TypeError, "Return value of stub method %s of class %s "\
+ "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
+ methname, classname,
+ getattr(rvtype, '__name__', rvtype), reply,
+ traceback.format_exc()
+ )
+ if code == ERROR:
+ text = base64.b64decode(text)
+ raise RuntimeError(text)
+ elif code == OK:
+ try:
+ if rvtype is None:
+ return
+ else:
+ return rvtype(text)
+ except:
+ import traceback
+ raise TypeError, "Return value of stub method %s of class %s "\
+ "cannot be parsed: must be of type %s - nested error: %s" % (
+ methname, classname,
+ getattr(rvtype, '__name__', rvtype),
+ traceback.format_exc()
+ )
+ else:
+ raise RuntimeError, "Invalid reply: %r "\
+ "for stub method %s of class %s - unknown code" % (
+ reply,
+ methname,
+ classname)
+
+ @staticmethod
+ def _make_stubs(server_class, template_class):
+ """
+ Returns a dictionary method_name -> method
+ with stub methods.
+
+ Usage:
+
+ class SomeProxy(BaseProxy):
+ ...
+
+ locals().update( BaseProxy._make_stubs(
+ ServerClass,
+ TemplateClass
+ ) )
+
+ ServerClass is the corresponding Server class, as
+ specified in the _ServerClass class method (_make_stubs
+ is static and can't access the method), and TemplateClass
+ is the ultimate implementation class behind the server,
+ from which argument names and defaults are taken, to
+ maintain meaningful interfaces.
+ """
+ rv = {}
+
+ class NONE: pass
+
+ import os.path
+ func_template_path = os.path.join(
+ os.path.dirname(__file__),
+ 'proxy_stub.tpl')
+ func_template_file = open(func_template_path, "r")
+ func_template = func_template_file.read()
+ func_template_file.close()
+
+ for methname in vars(template_class):
+ if hasattr(server_class, methname) and not methname.startswith('_'):
+ template_meth = getattr(template_class, methname)
+ server_meth = getattr(server_class, methname)
+
+ command = getattr(server_meth, '_handles_command', None)
+ argtypes = getattr(server_meth, '_argtypes', None)
+ argencoders = getattr(server_meth, '_argencoders', None)
+ rvtype = getattr(server_meth, '_retval', None)
+ doprop = False
+
+ if hasattr(template_meth, 'fget'):
+ # property getter
+ template_meth = template_meth.fget
+ doprop = True
+
+ if command is not None and argtypes is not None and argencoders is not None:
+ # We have an interface method...
+ code = template_meth.func_code
+ argnames = code.co_varnames[:code.co_argcount]
+ argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
+ + (template_meth.func_defaults or ()) )
+
+ func_globals = dict(
+ BaseProxy = BaseProxy,
+ argtypes = argtypes,
+ argencoders = argencoders,
+ rvtype = rvtype,
+ )
+ context = dict()
+
+ func_text = func_template % dict(
+ self = argnames[0],
+ args = '%s' % (','.join(argnames[1:])),
+ argdefs = ','.join([
+ argname if argdef is NONE
+ else "%s=%r" % (argname, argdef)
+ for argname, argdef in zip(argnames[1:], argdefaults[1:])
+ ]),
+ command = command,
+ methname = methname,
+ classname = server_class.__name__
+ )
+
+ func_text = compile(
+ func_text,
+ func_template_path,
+ 'exec')
+
+ exec func_text in func_globals, context
+
+ if doprop:
+ rv[methname] = property(context[methname])
+ else:
+ rv[methname] = context[methname]
+
+ return rv
+
+class TestbedControllerProxy(BaseProxy):
+
+ _ServerClass = TestbedControllerServer
+
+ def __init__(self, root_dir, log_level, testbed_id = None,
+ testbed_version = None, launch = True, host = None,
+ port = None, user = None, ident_key = None, agent = None,
+ environment_setup = ""):
+ if launch and (testbed_id == None or testbed_version == None):
+ raise RuntimeError("To launch a TesbedControllerServer a "
+ "testbed_id and testbed_version are required")
+ super(TestbedControllerProxy,self).__init__(
+ ctor_args = (root_dir, log_level, testbed_id, testbed_version),
+ root_dir = root_dir,
+ launch = launch, host = host, port = port, user = user,
+ ident_key = ident_key, agent = agent,
+ environment_setup = environment_setup)
+
+ locals().update( BaseProxy._make_stubs(
+ server_class = TestbedControllerServer,
+ template_class = nepi.core.execute.TestbedController,
+ ) )
+
+ # Shutdown stops the serverside...
+ def shutdown(self, _stub = shutdown):
+ rv = _stub(self)