From: Claudio-Daniel Freire Date: Thu, 12 May 2011 07:26:50 +0000 (+0200) Subject: Big refactoring of proxy code. X-Git-Tag: nepi_v2~51 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=d085eb3f8991a3475359a57fb13a84335e072832;p=nepi.git Big refactoring of proxy code. * Decorator-based serverside dispatch code which is easier to maintain * Automatic proxy generation using introspection on abstract interface (Proxy/ExperimentController) and serverside dispatch code. * Tried to maintain meaningful tracebacks * No more repeated code - stuff is only written once, so changes are easier to apply. * Changed marshalling for setters to use pickling instead of adding a fictious (and limited) type parameter. * Fixed some interface mismatches in the process --- diff --git a/setup.py b/setup.py index 024bca18..69109239 100755 --- a/setup.py +++ b/setup.py @@ -20,5 +20,6 @@ setup( "nepi.util.parser", "nepi.util" ], package_dir = {"": "src"}, - package_data = {"nepi.testbeds.planetlab" : ["scripts/*.py", "scripts/*.c"] }, + package_data = {"nepi.testbeds.planetlab" : ["scripts/*.py", "scripts/*.c"], + "nepi.util" : ["*.tpl"] }, ) diff --git a/src/nepi/core/execute.py b/src/nepi/core/execute.py index 9c4ee830..b67102a4 100644 --- a/src/nepi/core/execute.py +++ b/src/nepi/core/execute.py @@ -3,7 +3,7 @@ from nepi.core.attributes import Attribute, AttributesMap from nepi.core.connector import ConnectorTypeBase -from nepi.util import proxy, validation +from nepi.util import validation from nepi.util.constants import STATUS_FINISHED, TIME_NOW from nepi.util.parser._xml import XmlExperimentParser import sys @@ -199,8 +199,11 @@ class TestbedController(object): """Instructs creation of a connection between the given connectors""" raise NotImplementedError - def defer_cross_connect(self, guid, connector_type_name, cross_guid, - cross_testbed_id, cross_factory_id, cross_connector_type_name): + def defer_cross_connect(self, + guid, connector_type_name, + cross_guid, cross_testbed_guid, + cross_testbed_id, cross_factory_id, + cross_connector_type_name): """ Instructs creation of a connection between the given connectors of different testbed instances @@ -244,6 +247,14 @@ class TestbedController(object): """ raise NotImplementedError + def do_preconfigure(self): + """ + Done just before resolving netrefs, after connection, before cross connections, + useful for early stages of configuration, for setting up stuff that might be + required for netref resolution. + """ + raise NotImplementedError + def do_configure(self): """After do_configure elements are configured""" raise NotImplementedError @@ -461,6 +472,10 @@ class ExperimentController(object): BOOLEAN : 'getboolean', } + # deferred import because proxy needs + # our class definitions to define proxies + import nepi.util.proxy as proxy + conf = ConfigParser.RawConfigParser() conf.read(os.path.join(self._root_dir, 'deployment_config.ini')) for testbed_guid in conf.sections(): @@ -681,6 +696,10 @@ class ExperimentController(object): (testbed_id, testbed_version) = data.get_testbed_data(guid) deployment_config = self._deployment_config.get(guid) + # deferred import because proxy needs + # our class definitions to define proxies + import nepi.util.proxy as proxy + if deployment_config is None: # need to create one deployment_config = proxy.AccessConfiguration() diff --git a/src/nepi/util/proxy.py b/src/nepi/util/proxy.py index dcaae8bb..db5ce058 100644 --- a/src/nepi/util/proxy.py +++ b/src/nepi/util/proxy.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- import base64 +import nepi.core.execute from nepi.core.attributes import AttributesMap, Attribute from nepi.util import server, validation from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC @@ -11,6 +12,7 @@ import sys import time import tempfile import shutil +import functools # PROTOCOL REPLIES OK = 0 @@ -54,60 +56,6 @@ TESTBED_VERSION = 36 EXPERIMENT_SET = 37 EXPERIMENT_GET = 38 -# PARAMETER TYPE -STRING = 100 -INTEGER = 101 -BOOL = 102 -FLOAT = 103 - -# EXPERIMENT CONTROLER PROTOCOL MESSAGES -controller_messages = dict({ - XML: "%d" % XML, - TRACE: "%d|%s" % (TRACE, "%d|%d|%s|%s"), - FINISHED: "%d|%s" % (FINISHED, "%d"), - START: "%d" % START, - STOP: "%d" % STOP, - RECOVER : "%d" % RECOVER, - SHUTDOWN: "%d" % SHUTDOWN, - }) - -# TESTBED INSTANCE PROTOCOL MESSAGES -testbed_messages = dict({ - TRACE: "%d|%s" % (TRACE, "%d|%s|%s"), - START: "%d" % START, - STOP: "%d" % STOP, - SHUTDOWN: "%d" % SHUTDOWN, - CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"), - CREATE: "%d|%s" % (CREATE, "%d|%s"), - CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"), - FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"), - CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"), - CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s|%s"), - ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"), - ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"), - ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"), - DO_SETUP: "%d" % DO_SETUP, - DO_CREATE: "%d" % DO_CREATE, - DO_CONNECT_INIT: "%d" % DO_CONNECT_INIT, - DO_CONNECT_COMPL: "%d" % DO_CONNECT_COMPL, - DO_CONFIGURE: "%d" % DO_CONFIGURE, - DO_PRECONFIGURE: "%d" % DO_PRECONFIGURE, - DO_CROSS_CONNECT_INIT: "%d|%s" % (DO_CROSS_CONNECT_INIT, "%s"), - DO_CROSS_CONNECT_COMPL: "%d|%s" % (DO_CROSS_CONNECT_COMPL, "%s"), - GET: "%d|%s" % (GET, "%d|%s|%s"), - SET: "%d|%s" % (SET, "%d|%s|%s|%d|%s"), - EXPERIMENT_GET: "%d|%s" % (EXPERIMENT_GET, "%d|%d|%s|%s"), - EXPERIMENT_SET: "%d|%s" % (EXPERIMENT_SET, "%d|%d|%s|%s|%d|%s"), - GET_ROUTE: "%d|%s" % (GET_ROUTE, "%d|%d|%s"), - GET_ADDRESS: "%d|%s" % (GET_ADDRESS, "%d|%d|%s"), - ACTION: "%d|%s" % (ACTION, "%s|%d|%s"), - STATUS: "%d|%s" % (STATUS, "%s"), - GUIDS: "%d" % GUIDS, - GET_ATTRIBUTE_LIST: "%d|%s" % (GET_ATTRIBUTE_LIST,"%d"), - TESTBED_ID: "%d" % TESTBED_ID, - TESTBED_VERSION: "%d" % TESTBED_VERSION, - }) - instruction_text = dict({ OK: "OK", ERROR: "ERROR", @@ -143,50 +91,38 @@ instruction_text = dict({ ACTION: "ACTION", STATUS: "STATUS", GUIDS: "GUIDS", - STRING: "STRING", - INTEGER: "INTEGER", - BOOL: "BOOL", - FLOAT: "FLOAT", TESTBED_ID: "TESTBED_ID", TESTBED_VERSION: "TESTBED_VERSION", EXPERIMENT_SET: "EXPERIMENT_SET", EXPERIMENT_GET: "EXPERIMENT_GET", }) -def get_type(value): - if isinstance(value, bool): - return BOOL - elif isinstance(value, int): - return INTEGER - elif isinstance(value, float): - return FLOAT - else: - return STRING - -def set_type(type, value): - if type == INTEGER: - value = int(value) - elif type == FLOAT: - value = float(value) - elif type == BOOL: - value = value == "True" - else: - value = str(value) - return value - def log_msg(server, params): - instr = int(params[0]) - instr_txt = instruction_text[instr] - server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, - instr_txt, ", ".join(map(str, params[1:])))) + try: + instr = int(params[0]) + instr_txt = instruction_text[instr] + server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, + instr_txt, ", ".join(map(str, params[1:])))) + except: + # don't die for logging + pass def log_reply(server, reply): - res = reply.split("|") - code = int(res[0]) - code_txt = instruction_text[code] - txt = base64.b64decode(res[1]) - server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, - code_txt, txt)) + try: + res = reply.split("|") + code = int(res[0]) + code_txt = instruction_text[code] + try: + txt = base64.b64decode(res[1]) + except: + txt = res[1] + server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, + code_txt, txt)) + except: + # don't die for logging + server.log_debug("%s - reply: %s" % (server.__class__.__name__, + reply)) + pass def to_server_log_level(log_level): return ( @@ -296,17 +232,177 @@ def _build_testbed_controller(testbed_id, testbed_version): module = sys.modules[mod_name] return module.TestbedController(testbed_version) -class TestbedControllerServer(server.Server): - def __init__(self, root_dir, log_level, testbed_id, testbed_version): - super(TestbedControllerServer, self).__init__(root_dir, log_level) - self._testbed_id = testbed_id - self._testbed_version = testbed_version - self._testbed = None - - def post_daemonize(self): - self._testbed = _build_testbed_controller(self._testbed_id, - self._testbed_version) - +# Just a namespace class +class Marshalling: + class Decoders: + @staticmethod + def pickled_data(sdata): + return cPickle.loads(base64.b64decode(sdata)) + + @staticmethod + def base64_data(sdata): + return base64.b64decode(sdata) + + @staticmethod + def nullint(sdata): + return None if sdata == "None" else int(sdata) + + @staticmethod + def bool(sdata): + return sdata == 'True' + + class Encoders: + @staticmethod + def pickled_data(data): + return base64.b64encode(cPickle.dumps(data)) + + @staticmethod + def base64_data(data): + return base64.b64encode(data) + + @staticmethod + def nullint(data): + return "None" if data is None else int(data) + + @staticmethod + def bool(data): + return str(bool(data)) + + # import into Marshalling all the decoders + # they act as types + locals().update([ + (typname, typ) + for typname, typ in vars(Decoders).iteritems() + if not typname.startswith('_') + ]) + + _TYPE_ENCODERS = dict([ + # id(type) -> (, ) + (typname, (getattr(Encoders,typname),"%s")) + for typname in vars(Decoders) + if not typname.startswith('_') + and hasattr(Encoders,typname) + ]) + + # Builtins + _TYPE_ENCODERS["float"] = (float, "%r") + _TYPE_ENCODERS["int"] = (int, "%d") + _TYPE_ENCODERS["long"] = (int, "%d") + _TYPE_ENCODERS["str"] = (str, "%s") + _TYPE_ENCODERS["unicode"] = (str, "%s") + + # Generic encoder + _TYPE_ENCODERS[None] = (str, "%s") + + @staticmethod + def args(*types): + """ + Decorator that converts the given function into one that takes + a single "params" list, with each parameter marshalled according + to the given factory callable (type constructors are accepted). + + The first argument (self) is left untouched. + + eg: + + @Marshalling.args(int,int,str,base64_data) + def somefunc(self, someint, otherint, somestr, someb64): + return someretval + """ + def decor(f): + @functools.wraps(f) + def rv(self, params): + return f(self, *[ ctor(val) + for ctor,val in zip(types, params[1:]) ]) + + rv._argtypes = types + + # Derive type encoders by looking up types in _TYPE_ENCODERS + # make_proxy will use it to encode arguments in command strings + argencoders = [] + TYPE_ENCODERS = Marshalling._TYPE_ENCODERS + for typ in types: + if typ.__name__ in TYPE_ENCODERS: + argencoders.append(TYPE_ENCODERS[typ.__name__]) + else: + # generic encoder + argencoders.append(TYPE_ENCODERS[None]) + + rv._argencoders = tuple(argencoders) + + rv._retval = getattr(f, '_retval', None) + return rv + return decor + + @staticmethod + def retval(typ=Decoders.base64_data): + """ + Decorator that converts the given function into one that + returns a properly encoded return string, given that the undecorated + function returns suitable input for the encoding function. + + The optional typ argument specifies a type. + For the default of base64_data, return values should be strings. + The return value of the encoding method should be a string always. + + eg: + + @Marshalling.args(int,int,str,base64_data) + @Marshalling.retval(str) + def somefunc(self, someint, otherint, somestr, someb64): + return someint + """ + encode, fmt = Marshalling._TYPE_ENCODERS.get( + typ.__name__, + Marshalling._TYPE_ENCODERS[None]) + fmt = "%d|"+fmt + + def decor(f): + @functools.wraps(f) + def rv(self, *p, **kw): + data = f(self, *p, **kw) + return fmt % ( + OK, + encode(data) + ) + rv._retval = typ + rv._argtypes = getattr(f, '_argtypes', None) + rv._argencoders = getattr(f, '_argencoders', None) + return rv + return decor + + @staticmethod + def retvoid(f): + """ + Decorator that converts the given function into one that + always return an encoded empty string. + + Useful for null-returning functions. + """ + OKRV = "%d|" % (OK,) + + @functools.wraps(f) + def rv(self, *p, **kw): + f(self, *p, **kw) + return OKRV + + rv._retval = None + rv._argtypes = getattr(f, '_argtypes', None) + rv._argencoders = getattr(f, '_argencoders', None) + return rv + + @staticmethod + def handles(whichcommand): + """ + Associates the method with a given command code for servers. + It should always be the topmost decorator. + """ + def decor(f): + f._handles_command = whichcommand + return f + return decor + +class BaseServer(server.Server): def reply_action(self, msg): if not msg: result = base64.b64encode("Invalid command line") @@ -316,68 +412,13 @@ class TestbedControllerServer(server.Server): instruction = int(params[0]) log_msg(self, params) try: - if instruction == TRACE: - reply = self.trace(params) - elif instruction == START: - reply = self.start(params) - elif instruction == STOP: - reply = self.stop(params) - elif instruction == SHUTDOWN: - reply = self.shutdown(params) - elif instruction == CONFIGURE: - reply = self.defer_configure(params) - elif instruction == CREATE: - reply = self.defer_create(params) - elif instruction == CREATE_SET: - reply = self.defer_create_set(params) - elif instruction == FACTORY_SET: - reply = self.defer_factory_set(params) - elif instruction == CONNECT: - reply = self.defer_connect(params) - elif instruction == CROSS_CONNECT: - reply = self.defer_cross_connect(params) - elif instruction == ADD_TRACE: - reply = self.defer_add_trace(params) - elif instruction == ADD_ADDRESS: - reply = self.defer_add_address(params) - elif instruction == ADD_ROUTE: - reply = self.defer_add_route(params) - elif instruction == DO_SETUP: - reply = self.do_setup(params) - elif instruction == DO_CREATE: - reply = self.do_create(params) - elif instruction == DO_CONNECT_INIT: - reply = self.do_connect_init(params) - elif instruction == DO_CONNECT_COMPL: - reply = self.do_connect_compl(params) - elif instruction == DO_CONFIGURE: - reply = self.do_configure(params) - elif instruction == DO_PRECONFIGURE: - reply = self.do_preconfigure(params) - elif instruction == DO_CROSS_CONNECT_INIT: - reply = self.do_cross_connect_init(params) - elif instruction == DO_CROSS_CONNECT_COMPL: - reply = self.do_cross_connect_compl(params) - elif instruction == GET: - reply = self.get(params) - elif instruction == SET: - reply = self.set(params) - elif instruction == GET_ADDRESS: - reply = self.get_address(params) - elif instruction == GET_ROUTE: - reply = self.get_route(params) - elif instruction == ACTION: - reply = self.action(params) - elif instruction == STATUS: - reply = self.status(params) - elif instruction == GUIDS: - reply = self.guids(params) - elif instruction == GET_ATTRIBUTE_LIST: - reply = self.get_attribute_list(params) - elif instruction == TESTBED_ID: - reply = self.testbed_id(params) - elif instruction == TESTBED_VERSION: - reply = self.testbed_version(params) + for mname,meth in vars(self.__class__).iteritems(): + if not mname.startswith('_'): + cmd = getattr(meth, '_handles_command', None) + if cmd == instruction: + meth = getattr(self, mname) + reply = meth(params) + break else: error = "Invalid instruction %s" % instruction self.log_error(error) @@ -390,211 +431,212 @@ class TestbedControllerServer(server.Server): log_reply(self, reply) return reply - def guids(self, params): - guids = self._testbed.guids - value = cPickle.dumps(guids) - result = base64.b64encode(value) - return "%d|%s" % (OK, result) - - def testbed_id(self, params): - testbed_id = self._testbed.testbed_id - result = base64.b64encode(str(testbed_id)) - return "%d|%s" % (OK, result) - - def testbed_version(self, params): - testbed_version = self._testbed.testbed_version - result = base64.b64encode(str(testbed_version)) - return "%d|%s" % (OK, result) - - def defer_create(self, params): - guid = int(params[1]) - factory_id = params[2] +class TestbedControllerServer(BaseServer): + def __init__(self, root_dir, log_level, testbed_id, testbed_version): + super(TestbedControllerServer, self).__init__(root_dir, log_level) + self._testbed_id = testbed_id + self._testbed_version = testbed_version + self._testbed = None + + def post_daemonize(self): + self._testbed = _build_testbed_controller(self._testbed_id, + self._testbed_version) + + @Marshalling.handles(GUIDS) + @Marshalling.args() + @Marshalling.retval( Marshalling.pickled_data ) + def guids(self): + return self._testbed.guids + + @Marshalling.handles(TESTBED_ID) + @Marshalling.args() + @Marshalling.retval() + def testbed_id(self): + return str(self._testbed.testbed_id) + + @Marshalling.handles(TESTBED_VERSION) + @Marshalling.args() + @Marshalling.retval() + def testbed_version(self): + return str(self._testbed.testbed_version) + + @Marshalling.handles(CREATE) + @Marshalling.args(int, str) + @Marshalling.retvoid + def defer_create(self, guid, factory_id): self._testbed.defer_create(guid, factory_id) - return "%d|%s" % (OK, "") - def trace(self, params): - guid = int(params[1]) - trace_id = params[2] - attribute = base64.b64decode(params[3]) - trace = self._testbed.trace(guid, trace_id, attribute) - result = base64.b64encode(trace) - return "%d|%s" % (OK, result) + @Marshalling.handles(TRACE) + @Marshalling.args(int, str, Marshalling.base64_data) + @Marshalling.retval() + def trace(self, guid, trace_id, attribute): + return self._testbed.trace(guid, trace_id, attribute) - def start(self, params): + @Marshalling.handles(START) + @Marshalling.args() + @Marshalling.retvoid + def start(self): self._testbed.start() - return "%d|%s" % (OK, "") - def stop(self, params): + @Marshalling.handles(STOP) + @Marshalling.args() + @Marshalling.retvoid + def stop(self): self._testbed.stop() - return "%d|%s" % (OK, "") - def shutdown(self, params): + @Marshalling.handles(SHUTDOWN) + @Marshalling.args() + @Marshalling.retvoid + def shutdown(self): self._testbed.shutdown() - return "%d|%s" % (OK, "") - def defer_configure(self, params): - name = base64.b64decode(params[1]) - value = base64.b64decode(params[2]) - type = int(params[3]) - value = set_type(type, value) + @Marshalling.handles(CONFIGURE) + @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data) + @Marshalling.retvoid + def defer_configure(self, name, value): self._testbed.defer_configure(name, value) - return "%d|%s" % (OK, "") - - def defer_create_set(self, params): - guid = int(params[1]) - name = base64.b64decode(params[2]) - value = base64.b64decode(params[3]) - type = int(params[4]) - value = set_type(type, value) + + @Marshalling.handles(CREATE_SET) + @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data) + @Marshalling.retvoid + def defer_create_set(self, guid, name, value): self._testbed.defer_create_set(guid, name, value) - return "%d|%s" % (OK, "") - def defer_factory_set(self, params): - name = base64.b64decode(params[1]) - value = base64.b64decode(params[2]) - type = int(params[3]) - value = set_type(type, value) + @Marshalling.handles(FACTORY_SET) + @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data) + @Marshalling.retvoid + def defer_factory_set(self, name, value): self._testbed.defer_factory_set(name, value) - return "%d|%s" % (OK, "") - def defer_connect(self, params): - guid1 = int(params[1]) - connector_type_name1 = params[2] - guid2 = int(params[3]) - connector_type_name2 = params[4] + @Marshalling.handles(CONNECT) + @Marshalling.args(int, str, int, str) + @Marshalling.retvoid + def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2): self._testbed.defer_connect(guid1, connector_type_name1, guid2, connector_type_name2) - return "%d|%s" % (OK, "") - - def defer_cross_connect(self, params): - guid = int(params[1]) - connector_type_name = params[2] - cross_guid = int(params[3]) - cross_testbed_guid = int(params[4]) - cross_testbed_id = params[5] - cross_factory_id = params[6] - cross_connector_type_name = params[7] + + @Marshalling.handles(CROSS_CONNECT) + @Marshalling.args(int, str, int, int, str, str, str) + @Marshalling.retvoid + def defer_cross_connect(self, + guid, connector_type_name, + cross_guid, cross_testbed_guid, + cross_testbed_id, cross_factory_id, + cross_connector_type_name): self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid, cross_testbed_guid, cross_testbed_id, cross_factory_id, cross_connector_type_name) - return "%d|%s" % (OK, "") - def defer_add_trace(self, params): - guid = int(params[1]) - trace_id = params[2] + @Marshalling.handles(ADD_TRACE) + @Marshalling.args(int, str) + @Marshalling.retvoid + def defer_add_trace(self, guid, trace_id): self._testbed.defer_add_trace(guid, trace_id) - return "%d|%s" % (OK, "") - def defer_add_address(self, params): - guid = int(params[1]) - address = params[2] - netprefix = int(params[3]) - broadcast = params[4] + @Marshalling.handles(ADD_ADDRESS) + @Marshalling.args(int, str, int, str) + @Marshalling.retvoid + def defer_add_address(self, guid, address, netprefix, broadcast): self._testbed.defer_add_address(guid, address, netprefix, broadcast) - return "%d|%s" % (OK, "") - def defer_add_route(self, params): - guid = int(params[1]) - destination = params[2] - netprefix = int(params[3]) - nexthop = params[4] + @Marshalling.handles(ADD_ROUTE) + @Marshalling.args(int, str, int, str) + @Marshalling.retvoid + def defer_add_route(self, guid, destination, netprefix, nexthop): self._testbed.defer_add_route(guid, destination, netprefix, nexthop) - return "%d|%s" % (OK, "") - def do_setup(self, params): + @Marshalling.handles(DO_SETUP) + @Marshalling.args() + @Marshalling.retvoid + def do_setup(self): self._testbed.do_setup() - return "%d|%s" % (OK, "") - def do_create(self, params): + @Marshalling.handles(DO_CREATE) + @Marshalling.args() + @Marshalling.retvoid + def do_create(self): self._testbed.do_create() - return "%d|%s" % (OK, "") - def do_connect_init(self, params): + @Marshalling.handles(DO_CONNECT_INIT) + @Marshalling.args() + @Marshalling.retvoid + def do_connect_init(self): self._testbed.do_connect_init() - return "%d|%s" % (OK, "") - def do_connect_compl(self, params): + @Marshalling.handles(DO_CONNECT_COMPL) + @Marshalling.args() + @Marshalling.retvoid + def do_connect_compl(self): self._testbed.do_connect_compl() - return "%d|%s" % (OK, "") - def do_configure(self, params): + @Marshalling.handles(DO_CONFIGURE) + @Marshalling.args() + @Marshalling.retvoid + def do_configure(self): self._testbed.do_configure() - return "%d|%s" % (OK, "") - def do_preconfigure(self, params): + @Marshalling.handles(DO_PRECONFIGURE) + @Marshalling.args() + @Marshalling.retvoid + def do_preconfigure(self): self._testbed.do_preconfigure() - return "%d|%s" % (OK, "") - def do_cross_connect_init(self, params): - pcross_data = base64.b64decode(params[1]) - cross_data = cPickle.loads(pcross_data) + @Marshalling.handles(DO_CROSS_CONNECT_INIT) + @Marshalling.args( Marshalling.Decoders.pickled_data ) + @Marshalling.retvoid + def do_cross_connect_init(self, cross_data): self._testbed.do_cross_connect_init(cross_data) - return "%d|%s" % (OK, "") - def do_cross_connect_compl(self, params): - pcross_data = base64.b64decode(params[1]) - cross_data = cPickle.loads(pcross_data) + @Marshalling.handles(DO_CROSS_CONNECT_COMPL) + @Marshalling.args( Marshalling.Decoders.pickled_data ) + @Marshalling.retvoid + def do_cross_connect_compl(self, cross_data): self._testbed.do_cross_connect_compl(cross_data) - return "%d|%s" % (OK, "") - - def get(self, params): - guid = int(params[1]) - name = base64.b64decode(params[2]) - time = params[3] - value = self._testbed.get(guid, name, time) - result = base64.b64encode(str(value)) - return "%d|%s" % (OK, result) - - def set(self, params): - guid = int(params[1]) - name = base64.b64decode(params[2]) - value = base64.b64decode(params[3]) - type = int(params[2]) - time = params[4] - value = set_type(type, value) + + @Marshalling.handles(GET) + @Marshalling.args(int, Marshalling.base64_data, str) + @Marshalling.retval() + def get(self, guid, name, time): + return str(self._testbed.get(guid, name, time)) + + @Marshalling.handles(SET) + @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str) + @Marshalling.retvoid + def set(self, guid, name, value, time): self._testbed.set(guid, name, value, time) - return "%d|%s" % (OK, "") - - def get_address(self, params): - guid = int(params[1]) - index = int(params[2]) - attribute = base64.b64decode(params[3]) - value = self._testbed.get_address(guid, index, attribute) - result = base64.b64encode(str(value)) - return "%d|%s" % (OK, result) - - def get_route(self, params): - guid = int(params[1]) - index = int(params[2]) - attribute = base64.b64decode(params[3]) - value = self._testbed.get_route(guid, index, attribute) - result = base64.b64encode(str(value)) - return "%d|%s" % (OK, result) - - def action(self, params): - time = params[1] - guid = int(params[2]) - command = base64.b64decode(params[3]) + + @Marshalling.handles(GET_ADDRESS) + @Marshalling.args(int, int, Marshalling.base64_data) + @Marshalling.retval() + def get_address(self, guid, index, attribute): + return str(self._testbed.get_address(guid, index, attribute)) + + @Marshalling.handles(GET_ROUTE) + @Marshalling.args(int, int, Marshalling.base64_data) + @Marshalling.retval() + def get_route(self, guid, index, attribute): + return str(self._testbed.get_route(guid, index, attribute)) + + @Marshalling.handles(ACTION) + @Marshalling.args(str, int, Marshalling.base64_data) + @Marshalling.retvoid + def action(self, time, guid, command): self._testbed.action(time, guid, command) - return "%d|%s" % (OK, "") - - def status(self, params): - guid = None - if params[1] != "None": - guid = int(params[1]) - status = self._testbed.status(guid) - result = base64.b64encode(str(status)) - return "%d|%s" % (OK, result) - - def get_attribute_list(self, params): - guid = int(params[1]) - attr_list = self._testbed.get_attribute_list(guid) - value = cPickle.dumps(attr_list) - result = base64.b64encode(value) - return "%d|%s" % (OK, result) - -class ExperimentControllerServer(server.Server): + + @Marshalling.handles(STATUS) + @Marshalling.args(Marshalling.nullint) + @Marshalling.retval(int) + def status(self, guid): + return self._testbed.status(guid) + + @Marshalling.handles(GET_ATTRIBUTE_LIST) + @Marshalling.args(int) + @Marshalling.retval( Marshalling.pickled_data ) + def get_attribute_list(self, guid): + return self._testbed.get_attribute_list(guid) + +class ExperimentControllerServer(BaseServer): def __init__(self, root_dir, log_level, experiment_xml): super(ExperimentControllerServer, self).__init__(root_dir, log_level) self._experiment_xml = experiment_xml @@ -605,117 +647,81 @@ class ExperimentControllerServer(server.Server): self._controller = ExperimentController(self._experiment_xml, root_dir = self._root_dir) - def reply_action(self, msg): - if not msg: - result = base64.b64encode("Invalid command line") - reply = "%d|%s" % (ERROR, result) - else: - params = msg.split("|") - instruction = int(params[0]) - log_msg(self, params) - try: - if instruction == XML: - reply = self.experiment_xml(params) - elif instruction == TRACE: - reply = self.trace(params) - elif instruction == FINISHED: - reply = self.is_finished(params) - elif instruction == EXPERIMENT_GET: - reply = self.get(params) - elif instruction == EXPERIMENT_SET: - reply = self.set(params) - elif instruction == START: - reply = self.start(params) - elif instruction == STOP: - reply = self.stop(params) - elif instruction == RECOVER: - reply = self.recover(params) - elif instruction == SHUTDOWN: - reply = self.shutdown(params) - else: - error = "Invalid instruction %s" % instruction - self.log_error(error) - result = base64.b64encode(error) - reply = "%d|%s" % (ERROR, result) - except: - error = self.log_error() - result = base64.b64encode(error) - reply = "%d|%s" % (ERROR, result) - log_reply(self, reply) - return reply - - def experiment_xml(self, params): - xml = self._controller.experiment_xml - result = base64.b64encode(xml) - return "%d|%s" % (OK, result) + @Marshalling.handles(XML) + @Marshalling.args() + @Marshalling.retval() + def experiment_xml(self): + return self._controller.experiment_xml - def trace(self, params): - testbed_guid = int(params[1]) - guid = int(params[2]) - trace_id = params[3] - attribute = base64.b64decode(params[4]) - trace = self._controller.trace(testbed_guid, guid, trace_id, attribute) - result = base64.b64encode(trace) - return "%d|%s" % (OK, result) - - def is_finished(self, params): - guid = int(params[1]) - status = self._controller.is_finished(guid) - result = base64.b64encode(str(status)) - return "%d|%s" % (OK, result) - - def get(self, params): - testbed_guid = int(param[1]) - guid = int(params[2]) - name = base64.b64decode(params[3]) - value = self._controller.get(testbed_guid, guid, name, time) - time = params[4] - result = base64.b64encode(str(value)) - return "%d|%s" % (OK, result) - - def set(self, params): - testbed_guid = int(params[1]) - guid = int(params[2]) - name = base64.b64decode(params[3]) - value = base64.b64decode(params[4]) - type = int(params[3]) - time = params[5] - value = set_type(type, value) + @Marshalling.handles(TRACE) + @Marshalling.args(int, int, str, Marshalling.base64_data) + @Marshalling.retval() + def trace(self, testbed_guid, guid, trace_id, attribute): + return str(self._controller.trace(testbed_guid, guid, trace_id, attribute)) + + @Marshalling.handles(FINISHED) + @Marshalling.args(int) + @Marshalling.retval(Marshalling.bool) + def is_finished(self, guid): + return self._controller.is_finished(guid) + + @Marshalling.handles(EXPERIMENT_GET) + @Marshalling.args(int, int, Marshalling.base64_data, str) + @Marshalling.retval() + def get(self, testbed_guid, guid, name, time): + return str(self._controller.get(testbed_guid, guid, name, time)) + + @Marshalling.handles(EXPERIMENT_SET) + @Marshalling.args(int, int, Marshalling.base64_data, Marshalling.pickled_data, str) + @Marshalling.retvoid + def set(self, testbed_guid, guid, name, value, time): self._controller.set(testbed_guid, guid, name, value, time) - return "%d|%s" % (OK, "") - def start(self, params): + @Marshalling.handles(START) + @Marshalling.args() + @Marshalling.retvoid + def start(self): self._controller.start() - return "%d|%s" % (OK, "") - def stop(self, params): + @Marshalling.handles(STOP) + @Marshalling.args() + @Marshalling.retvoid + def stop(self): self._controller.stop() - return "%d|%s" % (OK, "") - def recover(self, params): + @Marshalling.handles(RECOVER) + @Marshalling.args() + @Marshalling.retvoid + def recover(self): self._controller.recover() - return "%d|%s" % (OK, "") - def shutdown(self, params): + @Marshalling.handles(SHUTDOWN) + @Marshalling.args() + @Marshalling.retvoid + def shutdown(self): self._controller.shutdown() - return "%d|%s" % (OK, "") -class TestbedControllerProxy(object): - def __init__(self, root_dir, log_level, testbed_id = None, - testbed_version = None, launch = True, host = None, +class BaseProxy(object): + _ServerClass = None + _ServerClassModule = "nepi.util.proxy" + + def __init__(self, + ctor_args, root_dir, + launch = True, host = None, port = None, user = None, ident_key = None, agent = None, environment_setup = ""): if launch: - if testbed_id == None or testbed_version == None: - raise RuntimeError("To launch a TesbedInstance server a \ - testbed_id and testbed_version are required") # ssh if host != None: - python_code = "from nepi.util.proxy import "\ - "TestbedControllerServer;"\ - "s = TestbedControllerServer('%s', %d, '%s', '%s');"\ - "s.run()" % (root_dir, log_level, testbed_id, - testbed_version) + python_code = ( + "from %(classmodule)s import %(classname)s;" + "s = %(classname)s%(ctor_args)r;" + "s.run()" + % dict( + classname = self._ServerClass.__name__, + classmodule = self._ServerClassModule, + ctor_args = ctor_args + ) ) proc = server.popen_ssh_subprocess(python_code, host = host, port = port, user = user, agent = agent, ident_key = ident_key, @@ -723,615 +729,241 @@ class TestbedControllerProxy(object): waitcommand = True) if proc.poll(): err = proc.stderr.read() - raise RuntimeError("Server could not be executed: %s" % \ - err) + raise RuntimeError, "Server could not be executed: %s" % (err,) else: # launch daemon - s = TestbedControllerServer(root_dir, log_level, testbed_id, - testbed_version) + s = self._ServerClass(*ctor_args) s.run() # connect client to server self._client = server.Client(root_dir, host = host, port = port, user = user, agent = agent, environment_setup = environment_setup) - - @property - def guids(self): - msg = testbed_messages[GUIDS] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - guids = cPickle.loads(text) - return guids - - @property - def testbed_id(self): - msg = testbed_messages[TESTBED_ID] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - return str(text) - - @property - def testbed_version(self): - msg = testbed_messages[TESTBED_VERSION] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - return str(text) - - def defer_configure(self, name, value): - msg = testbed_messages[CONFIGURE] - type = get_type(value) - # avoid having "|" in this parameters - name = base64.b64encode(name) - value = base64.b64encode(str(value)) - msg = msg % (name, value, type) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def defer_create(self, guid, factory_id): - msg = testbed_messages[CREATE] - msg = msg % (guid, factory_id) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def defer_create_set(self, guid, name, value): - msg = testbed_messages[CREATE_SET] - type = get_type(value) - # avoid having "|" in this parameters - name = base64.b64encode(name) - value = base64.b64encode(str(value)) - msg = msg % (guid, name, value, type) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def defer_factory_set(self, guid, name, value): - msg = testbed_messages[FACTORY_SET] - type = get_type(value) - # avoid having "|" in this parameters - name = base64.b64encode(name) - value = base64.b64encode(str(value)) - msg = msg % (guid, name, value, type) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def defer_connect(self, guid1, connector_type_name1, guid2, - connector_type_name2): - msg = testbed_messages[CONNECT] - msg = msg % (guid1, connector_type_name1, guid2, - connector_type_name2) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def defer_cross_connect(self, guid, connector_type_name, cross_guid, - cross_testbed_guid, cross_testbed_id, cross_factory_id, - cross_connector_type_name): - msg = testbed_messages[CROSS_CONNECT] - msg = msg % (guid, connector_type_name, cross_guid, cross_testbed_guid, - cross_testbed_id, cross_factory_id, cross_connector_type_name) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def defer_add_trace(self, guid, trace_id): - msg = testbed_messages[ADD_TRACE] - msg = msg % (guid, trace_id) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def defer_add_address(self, guid, address, netprefix, broadcast): - msg = testbed_messages[ADD_ADDRESS] - msg = msg % (guid, address, netprefix, broadcast) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def defer_add_route(self, guid, destination, netprefix, nexthop): - msg = testbed_messages[ADD_ROUTE] - msg = msg % (guid, destination, netprefix, nexthop) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def do_setup(self): - msg = testbed_messages[DO_SETUP] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def do_create(self): - msg = testbed_messages[DO_CREATE] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def do_connect_init(self): - msg = testbed_messages[DO_CONNECT_INIT] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def do_connect_compl(self): - msg = testbed_messages[DO_CONNECT_COMPL] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def do_configure(self): - msg = testbed_messages[DO_CONFIGURE] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def do_preconfigure(self): - msg = testbed_messages[DO_PRECONFIGURE] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def do_cross_connect_init(self, cross_data): - msg = testbed_messages[DO_CROSS_CONNECT_INIT] - pcross_data = cPickle.dumps(cross_data) - cross_data = base64.b64encode(pcross_data) - msg = msg % (cross_data) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def do_cross_connect_compl(self, cross_data): - msg = testbed_messages[DO_CROSS_CONNECT_COMPL] - pcross_data = cPickle.dumps(cross_data) - cross_data = base64.b64encode(pcross_data) - msg = msg % (cross_data) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def start(self, time = TIME_NOW): - msg = testbed_messages[START] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def stop(self, time = TIME_NOW): - msg = testbed_messages[STOP] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def set(self, guid, name, value, time = TIME_NOW): - msg = testbed_messages[SET] - type = get_type(value) - # avoid having "|" in this parameters - name = base64.b64encode(name) - value = base64.b64encode(str(value)) - msg = msg % (guid, name, value, type, time) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def get(self, guid, name, time = TIME_NOW): - msg = testbed_messages[GET] - # avoid having "|" in this parameters - name = base64.b64encode(name) - msg = msg % (guid, name, time) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - return text - - def get_address(self, guid, index, attribute): - msg = testbed_messages[GET_ADDRESS] - # avoid having "|" in this parameters - attribute = base64.b64encode(attribute) - msg = msg % (guid, index, attribute) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - return text - - def get_route(self, guid, index, attribute): - msg = testbed_messages[GET_ROUTE] - # avoid having "|" in this parameters - attribute = base64.b64encode(attribute) - msg = msg % (guid, index, attribute) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - return text - - def action(self, time, guid, action): - msg = testbed_messages[ACTION] - msg = msg % (time, guid, action) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def status(self, guid = None): - msg = testbed_messages[STATUS] - msg = msg % (guid,) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - return int(text) - - def trace(self, guid, trace_id, attribute='value'): - msg = testbed_messages[TRACE] - attribute = base64.b64encode(attribute) - msg = msg % (guid, trace_id, attribute) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - return text - - def get_attribute_list(self, guid): - msg = testbed_messages[GET_ATTRIBUTE_LIST] - msg = msg % (guid,) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - attr_list = cPickle.loads(text) - return attr_list - - def shutdown(self): - msg = testbed_messages[SHUTDOWN] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) + + @staticmethod + def _make_message(argtypes, argencoders, command, methname, classname, *args): + if len(argtypes) != len(argencoders): + raise ValueError, "Invalid arguments for _make_message: "\ + "in stub method %s of class %s "\ + "argtypes and argencoders must match in size" % ( + methname, classname ) + if len(argtypes) != len(args): + raise ValueError, "Invalid arguments for _make_message: "\ + "in stub method %s of class %s "\ + "expected %d arguments, got %d" % ( + methname, classname, + len(argtypes), len(args)) + + buf = [] + for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)): + try: + buf.append(fmt % encode(val)) + except: + import traceback + raise TypeError, "Argument %d of stub method %s of class %s "\ + "requires a value of type %s, but got %s - nested error: %s" % ( + argnum, methname, classname, + getattr(typ, '__name__', typ), type(val), + traceback.format_exc() + ) + + return "%d|%s" % (command, '|'.join(buf)) + + @staticmethod + def _parse_reply(rvtype, methname, classname, reply): + if not reply: + raise RuntimeError, "Invalid reply: %r "\ + "for stub method %s of class %s" % ( + reply, + methname, + classname) + + try: + result = reply.split("|") + code = int(result[0]) + text = result[1] + except: + import traceback + raise TypeError, "Return value of stub method %s of class %s "\ + "cannot be parsed: must be of type %s, got %r - nested error: %s" % ( + methname, classname, + getattr(rvtype, '__name__', rvtype), reply, + traceback.format_exc() + ) + if code == ERROR: + text = base64.b64decode(text) + raise RuntimeError(text) + elif code == OK: + try: + if rvtype is None: + return + else: + return rvtype(text) + except: + import traceback + raise TypeError, "Return value of stub method %s of class %s "\ + "cannot be parsed: must be of type %s - nested error: %s" % ( + methname, classname, + getattr(rvtype, '__name__', rvtype), + traceback.format_exc() + ) + else: + raise RuntimeError, "Invalid reply: %r "\ + "for stub method %s of class %s - unknown code" % ( + reply, + methname, + classname) + + @staticmethod + def _make_stubs(server_class, template_class): + """ + Returns a dictionary method_name -> method + with stub methods. + + Usage: + + class SomeProxy(BaseProxy): + ... + + locals().update( BaseProxy._make_stubs( + ServerClass, + TemplateClass + ) ) + + ServerClass is the corresponding Server class, as + specified in the _ServerClass class method (_make_stubs + is static and can't access the method), and TemplateClass + is the ultimate implementation class behind the server, + from which argument names and defaults are taken, to + maintain meaningful interfaces. + """ + rv = {} + + class NONE: pass + + import os.path + func_template_path = os.path.join( + os.path.dirname(__file__), + 'proxy_stub.tpl') + func_template_file = open(func_template_path, "r") + func_template = func_template_file.read() + func_template_file.close() + + for methname in vars(template_class): + if hasattr(server_class, methname) and not methname.startswith('_'): + template_meth = getattr(template_class, methname) + server_meth = getattr(server_class, methname) + + command = getattr(server_meth, '_handles_command', None) + argtypes = getattr(server_meth, '_argtypes', None) + argencoders = getattr(server_meth, '_argencoders', None) + rvtype = getattr(server_meth, '_retval', None) + doprop = False + + if hasattr(template_meth, 'fget'): + # property getter + template_meth = template_meth.fget + doprop = True + + if command is not None and argtypes is not None and argencoders is not None: + # We have an interface method... + code = template_meth.func_code + argnames = code.co_varnames[:code.co_argcount] + argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ())) + + (template_meth.func_defaults or ()) ) + + func_globals = dict( + BaseProxy = BaseProxy, + argtypes = argtypes, + argencoders = argencoders, + rvtype = rvtype, + ) + context = dict() + + func_text = func_template % dict( + self = argnames[0], + args = '%s' % (','.join(argnames[1:])), + argdefs = ','.join([ + argname if argdef is NONE + else "%s=%r" % (argname, argdef) + for argname, argdef in zip(argnames[1:], argdefaults[1:]) + ]), + command = command, + methname = methname, + classname = server_class.__name__ + ) + + func_text = compile( + func_text, + func_template_path, + 'exec') + + exec func_text in func_globals, context + + if doprop: + rv[methname] = property(context[methname]) + else: + rv[methname] = context[methname] + + return rv + +class TestbedControllerProxy(BaseProxy): + + _ServerClass = TestbedControllerServer + + def __init__(self, root_dir, log_level, testbed_id = None, + testbed_version = None, launch = True, host = None, + port = None, user = None, ident_key = None, agent = None, + environment_setup = ""): + if launch and (testbed_id == None or testbed_version == None): + raise RuntimeError("To launch a TesbedControllerServer a " + "testbed_id and testbed_version are required") + super(TestbedControllerProxy,self).__init__( + ctor_args = (root_dir, log_level, testbed_id, testbed_version), + root_dir = root_dir, + launch = launch, host = host, port = port, user = user, + ident_key = ident_key, agent = agent, + environment_setup = environment_setup) + + locals().update( BaseProxy._make_stubs( + server_class = TestbedControllerServer, + template_class = nepi.core.execute.TestbedController, + ) ) + + # Shutdown stops the serverside... + def shutdown(self, _stub = shutdown): + rv = _stub(self) self._client.send_stop() self._client.read_reply() # wait for it + return rv + -class ExperimentControllerProxy(object): +class ExperimentControllerProxy(BaseProxy): + _ServerClass = ExperimentControllerServer + def __init__(self, root_dir, log_level, experiment_xml = None, launch = True, host = None, port = None, user = None, ident_key = None, agent = None, environment_setup = ""): - if launch: - # launch server - if experiment_xml == None: - raise RuntimeError("To launch a ExperimentControllerServer a \ - xml description of the experiment is required") - # ssh - if host != None: - xml = experiment_xml - python_code = "from nepi.util.proxy import ExperimentControllerServer;\ - s = ExperimentControllerServer(%r, %r, %r);\ - s.run()" % (root_dir, log_level, xml) - proc = server.popen_ssh_subprocess(python_code, host = host, - port = port, user = user, agent = agent, - ident_key = ident_key, - environment_setup = environment_setup, - waitcommand = True) - if proc.poll(): - err = proc.stderr.read() - raise RuntimeError("Server could not be executed: %s" % \ - err) - else: - # launch daemon - s = ExperimentControllerServer(root_dir, log_level, experiment_xml) - s.run() + if launch and experiment_xml is None: + raise RuntimeError("To launch a ExperimentControllerServer a \ + xml description of the experiment is required") + super(ExperimentControllerProxy,self).__init__( + ctor_args = (root_dir, log_level, experiment_xml), + root_dir = root_dir, + launch = launch, host = host, port = port, user = user, + ident_key = ident_key, agent = agent, + environment_setup = environment_setup) + + locals().update( BaseProxy._make_stubs( + server_class = ExperimentControllerServer, + template_class = nepi.core.execute.ExperimentController, + ) ) - # connect client to server - self._client = server.Client(root_dir, host = host, port = port, - user = user, agent = agent, - environment_setup = environment_setup) - - @property - def experiment_xml(self): - msg = controller_messages[XML] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - return text - - def trace(self, testbed_guid, guid, trace_id, attribute='value'): - msg = controller_messages[TRACE] - attribute = base64.b64encode(attribute) - msg = msg % (testbed_guid, guid, trace_id, attribute) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == OK: - return text - raise RuntimeError(text) - - def start(self): - msg = controller_messages[START] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def stop(self): - msg = controller_messages[STOP] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def recover(self): - msg = controller_messages[RECOVER] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def is_finished(self, guid): - msg = controller_messages[FINISHED] - msg = msg % guid - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - return text == "True" - - def set(self, testbed_guid, guid, name, value, time = TIME_NOW): - msg = testbed_messages[EXPERIMENT_SET] - type = get_type(value) - # avoid having "|" in this parameters - name = base64.b64encode(name) - value = base64.b64encode(str(value)) - msg = msg % (testbed_guid, guid, name, value, type, time) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - - def get(self, testbed_guid, guid, name, time = TIME_NOW): - msg = testbed_messages[EXPERIMENT_GET] - # avoid having "|" in this parameters - name = base64.b64encode(name) - msg = msg % (testbed_guid, guid, name, time) - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) - return text - - def shutdown(self): - msg = controller_messages[SHUTDOWN] - self._client.send_msg(msg) - reply = self._client.read_reply() - if not reply: - raise RuntimeError, "Invalid reply: %r" % (reply,) - result = reply.split("|") - code = int(result[0]) - text = base64.b64decode(result[1]) - if code == ERROR: - raise RuntimeError(text) + + # Shutdown stops the serverside... + def shutdown(self, _stub = shutdown): + rv = _stub(self) self._client.send_stop() self._client.read_reply() # wait for it + return rv diff --git a/src/nepi/util/proxy_stub.tpl b/src/nepi/util/proxy_stub.tpl new file mode 100644 index 00000000..a4881f97 --- /dev/null +++ b/src/nepi/util/proxy_stub.tpl @@ -0,0 +1,17 @@ +def %(methname)s(%(self)s, %(argdefs)s): + msg = BaseProxy._make_message( + argtypes, + argencoders, + %(command)d, + %(methname)r, + %(classname)r, + %(args)s) + %(self)s._client.send_msg(msg) + reply = %(self)s._client.read_reply() + rv = BaseProxy._parse_reply( + rvtype, + %(methname)r, + %(classname)r, + reply) + return rv + diff --git a/test/testbeds/ns3/integration.py b/test/testbeds/ns3/integration.py index f0926da9..d4a7a4cd 100755 --- a/test/testbeds/ns3/integration.py +++ b/test/testbeds/ns3/integration.py @@ -74,7 +74,8 @@ class Ns3IntegrationTestCase(unittest.TestCase): time.sleep(0.5) ping_result = controller.trace(ns3_desc.guid, iface2.guid, "P2PAsciiTrace") comp_result = "- 19.021 /NodeList/1/DeviceList/0/$ns3::PointToPointNetDevice/TxQueue/Dequeue ns3::PppHeader (Point-to-Point Protocol: IP (0x0021)) ns3::Ipv4Header (tos 0x0 ttl 64 id 19 protocol 1 offset 0 flags [none] length: 84 10.0.0.2 > 10.0.0.1) ns3::Icmpv4Header (type=0, code=0) ns3::Icmpv4Echo (identifier=0, sequence=19)" - self.assertNotEqual(ping_result.find(comp_result), -1) + if ping_result.find(comp_result) == -1: + self.fail("Unexpected trace: %s" % (ping_result,)) controller.stop() controller.shutdown() @@ -147,7 +148,8 @@ class Ns3IntegrationTestCase(unittest.TestCase): ping_result = controller.trace(ns3_desc.guid, iface2.guid, "P2PAsciiTrace") comp_result = "- 19.021 /NodeList/1/DeviceList/0/$ns3::PointToPointNetDevice/TxQueue/Dequeue ns3::PppHeader (Point-to-Point Protocol: IP (0x0021)) ns3::Ipv4Header (tos 0x0 ttl 64 id 19 protocol 1 offset 0 flags [none] length: 84 10.0.0.2 > 10.0.0.1) ns3::Icmpv4Header (type=0, code=0) ns3::Icmpv4Echo (identifier=0, sequence=19)" - self.assertNotEqual(ping_result.find(comp_result), -1) + if ping_result.find(comp_result) == -1: + self.fail("Unexpected trace: %s" % (ping_result,)) controller.stop() controller.shutdown()