# -*- coding: utf-8 -*-
import base64
+import nepi.core.execute
from nepi.core.attributes import AttributesMap, Attribute
from nepi.util import server, validation
from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
import time
import tempfile
import shutil
+import functools
# PROTOCOL REPLIES
OK = 0
EXPERIMENT_SET = 37
EXPERIMENT_GET = 38
-# PARAMETER TYPE
-STRING = 100
-INTEGER = 101
-BOOL = 102
-FLOAT = 103
-
-# EXPERIMENT CONTROLER PROTOCOL MESSAGES
-controller_messages = dict({
- XML: "%d" % XML,
- TRACE: "%d|%s" % (TRACE, "%d|%d|%s|%s"),
- FINISHED: "%d|%s" % (FINISHED, "%d"),
- START: "%d" % START,
- STOP: "%d" % STOP,
- RECOVER : "%d" % RECOVER,
- SHUTDOWN: "%d" % SHUTDOWN,
- })
-
-# TESTBED INSTANCE PROTOCOL MESSAGES
-testbed_messages = dict({
- TRACE: "%d|%s" % (TRACE, "%d|%s|%s"),
- START: "%d" % START,
- STOP: "%d" % STOP,
- SHUTDOWN: "%d" % SHUTDOWN,
- CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
- CREATE: "%d|%s" % (CREATE, "%d|%s"),
- CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
- FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
- CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
- CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s|%s"),
- ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
- ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
- ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
- DO_SETUP: "%d" % DO_SETUP,
- DO_CREATE: "%d" % DO_CREATE,
- DO_CONNECT_INIT: "%d" % DO_CONNECT_INIT,
- DO_CONNECT_COMPL: "%d" % DO_CONNECT_COMPL,
- DO_CONFIGURE: "%d" % DO_CONFIGURE,
- DO_PRECONFIGURE: "%d" % DO_PRECONFIGURE,
- DO_CROSS_CONNECT_INIT: "%d|%s" % (DO_CROSS_CONNECT_INIT, "%s"),
- DO_CROSS_CONNECT_COMPL: "%d|%s" % (DO_CROSS_CONNECT_COMPL, "%s"),
- GET: "%d|%s" % (GET, "%d|%s|%s"),
- SET: "%d|%s" % (SET, "%d|%s|%s|%d|%s"),
- EXPERIMENT_GET: "%d|%s" % (EXPERIMENT_GET, "%d|%d|%s|%s"),
- EXPERIMENT_SET: "%d|%s" % (EXPERIMENT_SET, "%d|%d|%s|%s|%d|%s"),
- GET_ROUTE: "%d|%s" % (GET_ROUTE, "%d|%d|%s"),
- GET_ADDRESS: "%d|%s" % (GET_ADDRESS, "%d|%d|%s"),
- ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
- STATUS: "%d|%s" % (STATUS, "%s"),
- GUIDS: "%d" % GUIDS,
- GET_ATTRIBUTE_LIST: "%d|%s" % (GET_ATTRIBUTE_LIST,"%d"),
- TESTBED_ID: "%d" % TESTBED_ID,
- TESTBED_VERSION: "%d" % TESTBED_VERSION,
- })
-
instruction_text = dict({
OK: "OK",
ERROR: "ERROR",
ACTION: "ACTION",
STATUS: "STATUS",
GUIDS: "GUIDS",
- STRING: "STRING",
- INTEGER: "INTEGER",
- BOOL: "BOOL",
- FLOAT: "FLOAT",
TESTBED_ID: "TESTBED_ID",
TESTBED_VERSION: "TESTBED_VERSION",
EXPERIMENT_SET: "EXPERIMENT_SET",
EXPERIMENT_GET: "EXPERIMENT_GET",
})
-def get_type(value):
- if isinstance(value, bool):
- return BOOL
- elif isinstance(value, int):
- return INTEGER
- elif isinstance(value, float):
- return FLOAT
- else:
- return STRING
-
-def set_type(type, value):
- if type == INTEGER:
- value = int(value)
- elif type == FLOAT:
- value = float(value)
- elif type == BOOL:
- value = value == "True"
- else:
- value = str(value)
- return value
-
def log_msg(server, params):
- instr = int(params[0])
- instr_txt = instruction_text[instr]
- server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
- instr_txt, ", ".join(map(str, params[1:]))))
+ try:
+ instr = int(params[0])
+ instr_txt = instruction_text[instr]
+ server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
+ instr_txt, ", ".join(map(str, params[1:]))))
+ except:
+ # don't die for logging
+ pass
def log_reply(server, reply):
- res = reply.split("|")
- code = int(res[0])
- code_txt = instruction_text[code]
- txt = base64.b64decode(res[1])
- server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
- code_txt, txt))
+ try:
+ res = reply.split("|")
+ code = int(res[0])
+ code_txt = instruction_text[code]
+ try:
+ txt = base64.b64decode(res[1])
+ except:
+ txt = res[1]
+ server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
+ code_txt, txt))
+ except:
+ # don't die for logging
+ server.log_debug("%s - reply: %s" % (server.__class__.__name__,
+ reply))
+ pass
def to_server_log_level(log_level):
return (
module = sys.modules[mod_name]
return module.TestbedController(testbed_version)
-class TestbedControllerServer(server.Server):
- def __init__(self, root_dir, log_level, testbed_id, testbed_version):
- super(TestbedControllerServer, self).__init__(root_dir, log_level)
- self._testbed_id = testbed_id
- self._testbed_version = testbed_version
- self._testbed = None
-
- def post_daemonize(self):
- self._testbed = _build_testbed_controller(self._testbed_id,
- self._testbed_version)
-
+# Just a namespace class
+class Marshalling:
+ class Decoders:
+ @staticmethod
+ def pickled_data(sdata):
+ return cPickle.loads(base64.b64decode(sdata))
+
+ @staticmethod
+ def base64_data(sdata):
+ return base64.b64decode(sdata)
+
+ @staticmethod
+ def nullint(sdata):
+ return None if sdata == "None" else int(sdata)
+
+ @staticmethod
+ def bool(sdata):
+ return sdata == 'True'
+
+ class Encoders:
+ @staticmethod
+ def pickled_data(data):
+ return base64.b64encode(cPickle.dumps(data))
+
+ @staticmethod
+ def base64_data(data):
+ return base64.b64encode(data)
+
+ @staticmethod
+ def nullint(data):
+ return "None" if data is None else int(data)
+
+ @staticmethod
+ def bool(data):
+ return str(bool(data))
+
+ # import into Marshalling all the decoders
+ # they act as types
+ locals().update([
+ (typname, typ)
+ for typname, typ in vars(Decoders).iteritems()
+ if not typname.startswith('_')
+ ])
+
+ _TYPE_ENCODERS = dict([
+ # id(type) -> (<encoding_function>, <formatting_string>)
+ (typname, (getattr(Encoders,typname),"%s"))
+ for typname in vars(Decoders)
+ if not typname.startswith('_')
+ and hasattr(Encoders,typname)
+ ])
+
+ # Builtins
+ _TYPE_ENCODERS["float"] = (float, "%r")
+ _TYPE_ENCODERS["int"] = (int, "%d")
+ _TYPE_ENCODERS["long"] = (int, "%d")
+ _TYPE_ENCODERS["str"] = (str, "%s")
+ _TYPE_ENCODERS["unicode"] = (str, "%s")
+
+ # Generic encoder
+ _TYPE_ENCODERS[None] = (str, "%s")
+
+ @staticmethod
+ def args(*types):
+ """
+ Decorator that converts the given function into one that takes
+ a single "params" list, with each parameter marshalled according
+ to the given factory callable (type constructors are accepted).
+
+ The first argument (self) is left untouched.
+
+ eg:
+
+ @Marshalling.args(int,int,str,base64_data)
+ def somefunc(self, someint, otherint, somestr, someb64):
+ return someretval
+ """
+ def decor(f):
+ @functools.wraps(f)
+ def rv(self, params):
+ return f(self, *[ ctor(val)
+ for ctor,val in zip(types, params[1:]) ])
+
+ rv._argtypes = types
+
+ # Derive type encoders by looking up types in _TYPE_ENCODERS
+ # make_proxy will use it to encode arguments in command strings
+ argencoders = []
+ TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
+ for typ in types:
+ if typ.__name__ in TYPE_ENCODERS:
+ argencoders.append(TYPE_ENCODERS[typ.__name__])
+ else:
+ # generic encoder
+ argencoders.append(TYPE_ENCODERS[None])
+
+ rv._argencoders = tuple(argencoders)
+
+ rv._retval = getattr(f, '_retval', None)
+ return rv
+ return decor
+
+ @staticmethod
+ def retval(typ=Decoders.base64_data):
+ """
+ Decorator that converts the given function into one that
+ returns a properly encoded return string, given that the undecorated
+ function returns suitable input for the encoding function.
+
+ The optional typ argument specifies a type.
+ For the default of base64_data, return values should be strings.
+ The return value of the encoding method should be a string always.
+
+ eg:
+
+ @Marshalling.args(int,int,str,base64_data)
+ @Marshalling.retval(str)
+ def somefunc(self, someint, otherint, somestr, someb64):
+ return someint
+ """
+ encode, fmt = Marshalling._TYPE_ENCODERS.get(
+ typ.__name__,
+ Marshalling._TYPE_ENCODERS[None])
+ fmt = "%d|"+fmt
+
+ def decor(f):
+ @functools.wraps(f)
+ def rv(self, *p, **kw):
+ data = f(self, *p, **kw)
+ return fmt % (
+ OK,
+ encode(data)
+ )
+ rv._retval = typ
+ rv._argtypes = getattr(f, '_argtypes', None)
+ rv._argencoders = getattr(f, '_argencoders', None)
+ return rv
+ return decor
+
+ @staticmethod
+ def retvoid(f):
+ """
+ Decorator that converts the given function into one that
+ always return an encoded empty string.
+
+ Useful for null-returning functions.
+ """
+ OKRV = "%d|" % (OK,)
+
+ @functools.wraps(f)
+ def rv(self, *p, **kw):
+ f(self, *p, **kw)
+ return OKRV
+
+ rv._retval = None
+ rv._argtypes = getattr(f, '_argtypes', None)
+ rv._argencoders = getattr(f, '_argencoders', None)
+ return rv
+
+ @staticmethod
+ def handles(whichcommand):
+ """
+ Associates the method with a given command code for servers.
+ It should always be the topmost decorator.
+ """
+ def decor(f):
+ f._handles_command = whichcommand
+ return f
+ return decor
+
+class BaseServer(server.Server):
def reply_action(self, msg):
if not msg:
result = base64.b64encode("Invalid command line")
instruction = int(params[0])
log_msg(self, params)
try:
- if instruction == TRACE:
- reply = self.trace(params)
- elif instruction == START:
- reply = self.start(params)
- elif instruction == STOP:
- reply = self.stop(params)
- elif instruction == SHUTDOWN:
- reply = self.shutdown(params)
- elif instruction == CONFIGURE:
- reply = self.defer_configure(params)
- elif instruction == CREATE:
- reply = self.defer_create(params)
- elif instruction == CREATE_SET:
- reply = self.defer_create_set(params)
- elif instruction == FACTORY_SET:
- reply = self.defer_factory_set(params)
- elif instruction == CONNECT:
- reply = self.defer_connect(params)
- elif instruction == CROSS_CONNECT:
- reply = self.defer_cross_connect(params)
- elif instruction == ADD_TRACE:
- reply = self.defer_add_trace(params)
- elif instruction == ADD_ADDRESS:
- reply = self.defer_add_address(params)
- elif instruction == ADD_ROUTE:
- reply = self.defer_add_route(params)
- elif instruction == DO_SETUP:
- reply = self.do_setup(params)
- elif instruction == DO_CREATE:
- reply = self.do_create(params)
- elif instruction == DO_CONNECT_INIT:
- reply = self.do_connect_init(params)
- elif instruction == DO_CONNECT_COMPL:
- reply = self.do_connect_compl(params)
- elif instruction == DO_CONFIGURE:
- reply = self.do_configure(params)
- elif instruction == DO_PRECONFIGURE:
- reply = self.do_preconfigure(params)
- elif instruction == DO_CROSS_CONNECT_INIT:
- reply = self.do_cross_connect_init(params)
- elif instruction == DO_CROSS_CONNECT_COMPL:
- reply = self.do_cross_connect_compl(params)
- elif instruction == GET:
- reply = self.get(params)
- elif instruction == SET:
- reply = self.set(params)
- elif instruction == GET_ADDRESS:
- reply = self.get_address(params)
- elif instruction == GET_ROUTE:
- reply = self.get_route(params)
- elif instruction == ACTION:
- reply = self.action(params)
- elif instruction == STATUS:
- reply = self.status(params)
- elif instruction == GUIDS:
- reply = self.guids(params)
- elif instruction == GET_ATTRIBUTE_LIST:
- reply = self.get_attribute_list(params)
- elif instruction == TESTBED_ID:
- reply = self.testbed_id(params)
- elif instruction == TESTBED_VERSION:
- reply = self.testbed_version(params)
+ for mname,meth in vars(self.__class__).iteritems():
+ if not mname.startswith('_'):
+ cmd = getattr(meth, '_handles_command', None)
+ if cmd == instruction:
+ meth = getattr(self, mname)
+ reply = meth(params)
+ break
else:
error = "Invalid instruction %s" % instruction
self.log_error(error)
log_reply(self, reply)
return reply
- def guids(self, params):
- guids = self._testbed.guids
- value = cPickle.dumps(guids)
- result = base64.b64encode(value)
- return "%d|%s" % (OK, result)
-
- def testbed_id(self, params):
- testbed_id = self._testbed.testbed_id
- result = base64.b64encode(str(testbed_id))
- return "%d|%s" % (OK, result)
-
- def testbed_version(self, params):
- testbed_version = self._testbed.testbed_version
- result = base64.b64encode(str(testbed_version))
- return "%d|%s" % (OK, result)
-
- def defer_create(self, params):
- guid = int(params[1])
- factory_id = params[2]
+class TestbedControllerServer(BaseServer):
+ def __init__(self, root_dir, log_level, testbed_id, testbed_version):
+ super(TestbedControllerServer, self).__init__(root_dir, log_level)
+ self._testbed_id = testbed_id
+ self._testbed_version = testbed_version
+ self._testbed = None
+
+ def post_daemonize(self):
+ self._testbed = _build_testbed_controller(self._testbed_id,
+ self._testbed_version)
+
+ @Marshalling.handles(GUIDS)
+ @Marshalling.args()
+ @Marshalling.retval( Marshalling.pickled_data )
+ def guids(self):
+ return self._testbed.guids
+
+ @Marshalling.handles(TESTBED_ID)
+ @Marshalling.args()
+ @Marshalling.retval()
+ def testbed_id(self):
+ return str(self._testbed.testbed_id)
+
+ @Marshalling.handles(TESTBED_VERSION)
+ @Marshalling.args()
+ @Marshalling.retval()
+ def testbed_version(self):
+ return str(self._testbed.testbed_version)
+
+ @Marshalling.handles(CREATE)
+ @Marshalling.args(int, str)
+ @Marshalling.retvoid
+ def defer_create(self, guid, factory_id):
self._testbed.defer_create(guid, factory_id)
- return "%d|%s" % (OK, "")
- def trace(self, params):
- guid = int(params[1])
- trace_id = params[2]
- attribute = base64.b64decode(params[3])
- trace = self._testbed.trace(guid, trace_id, attribute)
- result = base64.b64encode(trace)
- return "%d|%s" % (OK, result)
+ @Marshalling.handles(TRACE)
+ @Marshalling.args(int, str, Marshalling.base64_data)
+ @Marshalling.retval()
+ def trace(self, guid, trace_id, attribute):
+ return self._testbed.trace(guid, trace_id, attribute)
- def start(self, params):
+ @Marshalling.handles(START)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def start(self):
self._testbed.start()
- return "%d|%s" % (OK, "")
- def stop(self, params):
+ @Marshalling.handles(STOP)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def stop(self):
self._testbed.stop()
- return "%d|%s" % (OK, "")
- def shutdown(self, params):
+ @Marshalling.handles(SHUTDOWN)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def shutdown(self):
self._testbed.shutdown()
- return "%d|%s" % (OK, "")
- def defer_configure(self, params):
- name = base64.b64decode(params[1])
- value = base64.b64decode(params[2])
- type = int(params[3])
- value = set_type(type, value)
+ @Marshalling.handles(CONFIGURE)
+ @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
+ @Marshalling.retvoid
+ def defer_configure(self, name, value):
self._testbed.defer_configure(name, value)
- return "%d|%s" % (OK, "")
-
- def defer_create_set(self, params):
- guid = int(params[1])
- name = base64.b64decode(params[2])
- value = base64.b64decode(params[3])
- type = int(params[4])
- value = set_type(type, value)
+
+ @Marshalling.handles(CREATE_SET)
+ @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
+ @Marshalling.retvoid
+ def defer_create_set(self, guid, name, value):
self._testbed.defer_create_set(guid, name, value)
- return "%d|%s" % (OK, "")
- def defer_factory_set(self, params):
- name = base64.b64decode(params[1])
- value = base64.b64decode(params[2])
- type = int(params[3])
- value = set_type(type, value)
+ @Marshalling.handles(FACTORY_SET)
+ @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
+ @Marshalling.retvoid
+ def defer_factory_set(self, name, value):
self._testbed.defer_factory_set(name, value)
- return "%d|%s" % (OK, "")
- def defer_connect(self, params):
- guid1 = int(params[1])
- connector_type_name1 = params[2]
- guid2 = int(params[3])
- connector_type_name2 = params[4]
+ @Marshalling.handles(CONNECT)
+ @Marshalling.args(int, str, int, str)
+ @Marshalling.retvoid
+ def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
self._testbed.defer_connect(guid1, connector_type_name1, guid2,
connector_type_name2)
- return "%d|%s" % (OK, "")
-
- def defer_cross_connect(self, params):
- guid = int(params[1])
- connector_type_name = params[2]
- cross_guid = int(params[3])
- cross_testbed_guid = int(params[4])
- cross_testbed_id = params[5]
- cross_factory_id = params[6]
- cross_connector_type_name = params[7]
+
+ @Marshalling.handles(CROSS_CONNECT)
+ @Marshalling.args(int, str, int, int, str, str, str)
+ @Marshalling.retvoid
+ def defer_cross_connect(self,
+ guid, connector_type_name,
+ cross_guid, cross_testbed_guid,
+ cross_testbed_id, cross_factory_id,
+ cross_connector_type_name):
self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
cross_testbed_guid, cross_testbed_id, cross_factory_id,
cross_connector_type_name)
- return "%d|%s" % (OK, "")
- def defer_add_trace(self, params):
- guid = int(params[1])
- trace_id = params[2]
+ @Marshalling.handles(ADD_TRACE)
+ @Marshalling.args(int, str)
+ @Marshalling.retvoid
+ def defer_add_trace(self, guid, trace_id):
self._testbed.defer_add_trace(guid, trace_id)
- return "%d|%s" % (OK, "")
- def defer_add_address(self, params):
- guid = int(params[1])
- address = params[2]
- netprefix = int(params[3])
- broadcast = params[4]
+ @Marshalling.handles(ADD_ADDRESS)
+ @Marshalling.args(int, str, int, str)
+ @Marshalling.retvoid
+ def defer_add_address(self, guid, address, netprefix, broadcast):
self._testbed.defer_add_address(guid, address, netprefix,
broadcast)
- return "%d|%s" % (OK, "")
- def defer_add_route(self, params):
- guid = int(params[1])
- destination = params[2]
- netprefix = int(params[3])
- nexthop = params[4]
+ @Marshalling.handles(ADD_ROUTE)
+ @Marshalling.args(int, str, int, str)
+ @Marshalling.retvoid
+ def defer_add_route(self, guid, destination, netprefix, nexthop):
self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
- return "%d|%s" % (OK, "")
- def do_setup(self, params):
+ @Marshalling.handles(DO_SETUP)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_setup(self):
self._testbed.do_setup()
- return "%d|%s" % (OK, "")
- def do_create(self, params):
+ @Marshalling.handles(DO_CREATE)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_create(self):
self._testbed.do_create()
- return "%d|%s" % (OK, "")
- def do_connect_init(self, params):
+ @Marshalling.handles(DO_CONNECT_INIT)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_connect_init(self):
self._testbed.do_connect_init()
- return "%d|%s" % (OK, "")
- def do_connect_compl(self, params):
+ @Marshalling.handles(DO_CONNECT_COMPL)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_connect_compl(self):
self._testbed.do_connect_compl()
- return "%d|%s" % (OK, "")
- def do_configure(self, params):
+ @Marshalling.handles(DO_CONFIGURE)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_configure(self):
self._testbed.do_configure()
- return "%d|%s" % (OK, "")
- def do_preconfigure(self, params):
+ @Marshalling.handles(DO_PRECONFIGURE)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_preconfigure(self):
self._testbed.do_preconfigure()
- return "%d|%s" % (OK, "")
- def do_cross_connect_init(self, params):
- pcross_data = base64.b64decode(params[1])
- cross_data = cPickle.loads(pcross_data)
+ @Marshalling.handles(DO_CROSS_CONNECT_INIT)
+ @Marshalling.args( Marshalling.Decoders.pickled_data )
+ @Marshalling.retvoid
+ def do_cross_connect_init(self, cross_data):
self._testbed.do_cross_connect_init(cross_data)
- return "%d|%s" % (OK, "")
- def do_cross_connect_compl(self, params):
- pcross_data = base64.b64decode(params[1])
- cross_data = cPickle.loads(pcross_data)
+ @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
+ @Marshalling.args( Marshalling.Decoders.pickled_data )
+ @Marshalling.retvoid
+ def do_cross_connect_compl(self, cross_data):
self._testbed.do_cross_connect_compl(cross_data)
- return "%d|%s" % (OK, "")
-
- def get(self, params):
- guid = int(params[1])
- name = base64.b64decode(params[2])
- time = params[3]
- value = self._testbed.get(guid, name, time)
- result = base64.b64encode(str(value))
- return "%d|%s" % (OK, result)
-
- def set(self, params):
- guid = int(params[1])
- name = base64.b64decode(params[2])
- value = base64.b64decode(params[3])
- type = int(params[2])
- time = params[4]
- value = set_type(type, value)
+
+ @Marshalling.handles(GET)
+ @Marshalling.args(int, Marshalling.base64_data, str)
+ @Marshalling.retval()
+ def get(self, guid, name, time):
+ return str(self._testbed.get(guid, name, time))
+
+ @Marshalling.handles(SET)
+ @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
+ @Marshalling.retvoid
+ def set(self, guid, name, value, time):
self._testbed.set(guid, name, value, time)
- return "%d|%s" % (OK, "")
-
- def get_address(self, params):
- guid = int(params[1])
- index = int(params[2])
- attribute = base64.b64decode(params[3])
- value = self._testbed.get_address(guid, index, attribute)
- result = base64.b64encode(str(value))
- return "%d|%s" % (OK, result)
-
- def get_route(self, params):
- guid = int(params[1])
- index = int(params[2])
- attribute = base64.b64decode(params[3])
- value = self._testbed.get_route(guid, index, attribute)
- result = base64.b64encode(str(value))
- return "%d|%s" % (OK, result)
-
- def action(self, params):
- time = params[1]
- guid = int(params[2])
- command = base64.b64decode(params[3])
+
+ @Marshalling.handles(GET_ADDRESS)
+ @Marshalling.args(int, int, Marshalling.base64_data)
+ @Marshalling.retval()
+ def get_address(self, guid, index, attribute):
+ return str(self._testbed.get_address(guid, index, attribute))
+
+ @Marshalling.handles(GET_ROUTE)
+ @Marshalling.args(int, int, Marshalling.base64_data)
+ @Marshalling.retval()
+ def get_route(self, guid, index, attribute):
+ return str(self._testbed.get_route(guid, index, attribute))
+
+ @Marshalling.handles(ACTION)
+ @Marshalling.args(str, int, Marshalling.base64_data)
+ @Marshalling.retvoid
+ def action(self, time, guid, command):
self._testbed.action(time, guid, command)
- return "%d|%s" % (OK, "")
-
- def status(self, params):
- guid = None
- if params[1] != "None":
- guid = int(params[1])
- status = self._testbed.status(guid)
- result = base64.b64encode(str(status))
- return "%d|%s" % (OK, result)
-
- def get_attribute_list(self, params):
- guid = int(params[1])
- attr_list = self._testbed.get_attribute_list(guid)
- value = cPickle.dumps(attr_list)
- result = base64.b64encode(value)
- return "%d|%s" % (OK, result)
-
-class ExperimentControllerServer(server.Server):
+
+ @Marshalling.handles(STATUS)
+ @Marshalling.args(Marshalling.nullint)
+ @Marshalling.retval(int)
+ def status(self, guid):
+ return self._testbed.status(guid)
+
+ @Marshalling.handles(GET_ATTRIBUTE_LIST)
+ @Marshalling.args(int)
+ @Marshalling.retval( Marshalling.pickled_data )
+ def get_attribute_list(self, guid):
+ return self._testbed.get_attribute_list(guid)
+
+class ExperimentControllerServer(BaseServer):
def __init__(self, root_dir, log_level, experiment_xml):
super(ExperimentControllerServer, self).__init__(root_dir, log_level)
self._experiment_xml = experiment_xml
self._controller = ExperimentController(self._experiment_xml,
root_dir = self._root_dir)
- def reply_action(self, msg):
- if not msg:
- result = base64.b64encode("Invalid command line")
- reply = "%d|%s" % (ERROR, result)
- else:
- params = msg.split("|")
- instruction = int(params[0])
- log_msg(self, params)
- try:
- if instruction == XML:
- reply = self.experiment_xml(params)
- elif instruction == TRACE:
- reply = self.trace(params)
- elif instruction == FINISHED:
- reply = self.is_finished(params)
- elif instruction == EXPERIMENT_GET:
- reply = self.get(params)
- elif instruction == EXPERIMENT_SET:
- reply = self.set(params)
- elif instruction == START:
- reply = self.start(params)
- elif instruction == STOP:
- reply = self.stop(params)
- elif instruction == RECOVER:
- reply = self.recover(params)
- elif instruction == SHUTDOWN:
- reply = self.shutdown(params)
- else:
- error = "Invalid instruction %s" % instruction
- self.log_error(error)
- result = base64.b64encode(error)
- reply = "%d|%s" % (ERROR, result)
- except:
- error = self.log_error()
- result = base64.b64encode(error)
- reply = "%d|%s" % (ERROR, result)
- log_reply(self, reply)
- return reply
-
- def experiment_xml(self, params):
- xml = self._controller.experiment_xml
- result = base64.b64encode(xml)
- return "%d|%s" % (OK, result)
+ @Marshalling.handles(XML)
+ @Marshalling.args()
+ @Marshalling.retval()
+ def experiment_xml(self):
+ return self._controller.experiment_xml
- def trace(self, params):
- testbed_guid = int(params[1])
- guid = int(params[2])
- trace_id = params[3]
- attribute = base64.b64decode(params[4])
- trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
- result = base64.b64encode(trace)
- return "%d|%s" % (OK, result)
-
- def is_finished(self, params):
- guid = int(params[1])
- status = self._controller.is_finished(guid)
- result = base64.b64encode(str(status))
- return "%d|%s" % (OK, result)
-
- def get(self, params):
- testbed_guid = int(param[1])
- guid = int(params[2])
- name = base64.b64decode(params[3])
- value = self._controller.get(testbed_guid, guid, name, time)
- time = params[4]
- result = base64.b64encode(str(value))
- return "%d|%s" % (OK, result)
-
- def set(self, params):
- testbed_guid = int(params[1])
- guid = int(params[2])
- name = base64.b64decode(params[3])
- value = base64.b64decode(params[4])
- type = int(params[3])
- time = params[5]
- value = set_type(type, value)
+ @Marshalling.handles(TRACE)
+ @Marshalling.args(int, int, str, Marshalling.base64_data)
+ @Marshalling.retval()
+ def trace(self, testbed_guid, guid, trace_id, attribute):
+ return str(self._controller.trace(testbed_guid, guid, trace_id, attribute))
+
+ @Marshalling.handles(FINISHED)
+ @Marshalling.args(int)
+ @Marshalling.retval(Marshalling.bool)
+ def is_finished(self, guid):
+ return self._controller.is_finished(guid)
+
+ @Marshalling.handles(EXPERIMENT_GET)
+ @Marshalling.args(int, int, Marshalling.base64_data, str)
+ @Marshalling.retval()
+ def get(self, testbed_guid, guid, name, time):
+ return str(self._controller.get(testbed_guid, guid, name, time))
+
+ @Marshalling.handles(EXPERIMENT_SET)
+ @Marshalling.args(int, int, Marshalling.base64_data, Marshalling.pickled_data, str)
+ @Marshalling.retvoid
+ def set(self, testbed_guid, guid, name, value, time):
self._controller.set(testbed_guid, guid, name, value, time)
- return "%d|%s" % (OK, "")
- def start(self, params):
+ @Marshalling.handles(START)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def start(self):
self._controller.start()
- return "%d|%s" % (OK, "")
- def stop(self, params):
+ @Marshalling.handles(STOP)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def stop(self):
self._controller.stop()
- return "%d|%s" % (OK, "")
- def recover(self, params):
+ @Marshalling.handles(RECOVER)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def recover(self):
self._controller.recover()
- return "%d|%s" % (OK, "")
- def shutdown(self, params):
+ @Marshalling.handles(SHUTDOWN)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def shutdown(self):
self._controller.shutdown()
- return "%d|%s" % (OK, "")
-class TestbedControllerProxy(object):
- def __init__(self, root_dir, log_level, testbed_id = None,
- testbed_version = None, launch = True, host = None,
+class BaseProxy(object):
+ _ServerClass = None
+ _ServerClassModule = "nepi.util.proxy"
+
+ def __init__(self,
+ ctor_args, root_dir,
+ launch = True, host = None,
port = None, user = None, ident_key = None, agent = None,
environment_setup = ""):
if launch:
- if testbed_id == None or testbed_version == None:
- raise RuntimeError("To launch a TesbedInstance server a \
- testbed_id and testbed_version are required")
# ssh
if host != None:
- python_code = "from nepi.util.proxy import "\
- "TestbedControllerServer;"\
- "s = TestbedControllerServer('%s', %d, '%s', '%s');"\
- "s.run()" % (root_dir, log_level, testbed_id,
- testbed_version)
+ python_code = (
+ "from %(classmodule)s import %(classname)s;"
+ "s = %(classname)s%(ctor_args)r;"
+ "s.run()"
+ % dict(
+ classname = self._ServerClass.__name__,
+ classmodule = self._ServerClassModule,
+ ctor_args = ctor_args
+ ) )
proc = server.popen_ssh_subprocess(python_code, host = host,
port = port, user = user, agent = agent,
ident_key = ident_key,
waitcommand = True)
if proc.poll():
err = proc.stderr.read()
- raise RuntimeError("Server could not be executed: %s" % \
- err)
+ raise RuntimeError, "Server could not be executed: %s" % (err,)
else:
# launch daemon
- s = TestbedControllerServer(root_dir, log_level, testbed_id,
- testbed_version)
+ s = self._ServerClass(*ctor_args)
s.run()
# connect client to server
self._client = server.Client(root_dir, host = host, port = port,
user = user, agent = agent,
environment_setup = environment_setup)
-
- @property
- def guids(self):
- msg = testbed_messages[GUIDS]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- guids = cPickle.loads(text)
- return guids
-
- @property
- def testbed_id(self):
- msg = testbed_messages[TESTBED_ID]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return str(text)
-
- @property
- def testbed_version(self):
- msg = testbed_messages[TESTBED_VERSION]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return str(text)
-
- def defer_configure(self, name, value):
- msg = testbed_messages[CONFIGURE]
- type = get_type(value)
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- value = base64.b64encode(str(value))
- msg = msg % (name, value, type)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_create(self, guid, factory_id):
- msg = testbed_messages[CREATE]
- msg = msg % (guid, factory_id)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_create_set(self, guid, name, value):
- msg = testbed_messages[CREATE_SET]
- type = get_type(value)
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- value = base64.b64encode(str(value))
- msg = msg % (guid, name, value, type)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_factory_set(self, guid, name, value):
- msg = testbed_messages[FACTORY_SET]
- type = get_type(value)
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- value = base64.b64encode(str(value))
- msg = msg % (guid, name, value, type)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_connect(self, guid1, connector_type_name1, guid2,
- connector_type_name2):
- msg = testbed_messages[CONNECT]
- msg = msg % (guid1, connector_type_name1, guid2,
- connector_type_name2)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_cross_connect(self, guid, connector_type_name, cross_guid,
- cross_testbed_guid, cross_testbed_id, cross_factory_id,
- cross_connector_type_name):
- msg = testbed_messages[CROSS_CONNECT]
- msg = msg % (guid, connector_type_name, cross_guid, cross_testbed_guid,
- cross_testbed_id, cross_factory_id, cross_connector_type_name)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_add_trace(self, guid, trace_id):
- msg = testbed_messages[ADD_TRACE]
- msg = msg % (guid, trace_id)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_add_address(self, guid, address, netprefix, broadcast):
- msg = testbed_messages[ADD_ADDRESS]
- msg = msg % (guid, address, netprefix, broadcast)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_add_route(self, guid, destination, netprefix, nexthop):
- msg = testbed_messages[ADD_ROUTE]
- msg = msg % (guid, destination, netprefix, nexthop)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_setup(self):
- msg = testbed_messages[DO_SETUP]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_create(self):
- msg = testbed_messages[DO_CREATE]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_connect_init(self):
- msg = testbed_messages[DO_CONNECT_INIT]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_connect_compl(self):
- msg = testbed_messages[DO_CONNECT_COMPL]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_configure(self):
- msg = testbed_messages[DO_CONFIGURE]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_preconfigure(self):
- msg = testbed_messages[DO_PRECONFIGURE]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_cross_connect_init(self, cross_data):
- msg = testbed_messages[DO_CROSS_CONNECT_INIT]
- pcross_data = cPickle.dumps(cross_data)
- cross_data = base64.b64encode(pcross_data)
- msg = msg % (cross_data)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_cross_connect_compl(self, cross_data):
- msg = testbed_messages[DO_CROSS_CONNECT_COMPL]
- pcross_data = cPickle.dumps(cross_data)
- cross_data = base64.b64encode(pcross_data)
- msg = msg % (cross_data)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def start(self, time = TIME_NOW):
- msg = testbed_messages[START]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def stop(self, time = TIME_NOW):
- msg = testbed_messages[STOP]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def set(self, guid, name, value, time = TIME_NOW):
- msg = testbed_messages[SET]
- type = get_type(value)
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- value = base64.b64encode(str(value))
- msg = msg % (guid, name, value, type, time)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def get(self, guid, name, time = TIME_NOW):
- msg = testbed_messages[GET]
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- msg = msg % (guid, name, time)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
-
- def get_address(self, guid, index, attribute):
- msg = testbed_messages[GET_ADDRESS]
- # avoid having "|" in this parameters
- attribute = base64.b64encode(attribute)
- msg = msg % (guid, index, attribute)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
-
- def get_route(self, guid, index, attribute):
- msg = testbed_messages[GET_ROUTE]
- # avoid having "|" in this parameters
- attribute = base64.b64encode(attribute)
- msg = msg % (guid, index, attribute)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
-
- def action(self, time, guid, action):
- msg = testbed_messages[ACTION]
- msg = msg % (time, guid, action)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def status(self, guid = None):
- msg = testbed_messages[STATUS]
- msg = msg % (guid,)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return int(text)
-
- def trace(self, guid, trace_id, attribute='value'):
- msg = testbed_messages[TRACE]
- attribute = base64.b64encode(attribute)
- msg = msg % (guid, trace_id, attribute)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
-
- def get_attribute_list(self, guid):
- msg = testbed_messages[GET_ATTRIBUTE_LIST]
- msg = msg % (guid,)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- attr_list = cPickle.loads(text)
- return attr_list
-
- def shutdown(self):
- msg = testbed_messages[SHUTDOWN]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
+
+ @staticmethod
+ def _make_message(argtypes, argencoders, command, methname, classname, *args):
+ if len(argtypes) != len(argencoders):
+ raise ValueError, "Invalid arguments for _make_message: "\
+ "in stub method %s of class %s "\
+ "argtypes and argencoders must match in size" % (
+ methname, classname )
+ if len(argtypes) != len(args):
+ raise ValueError, "Invalid arguments for _make_message: "\
+ "in stub method %s of class %s "\
+ "expected %d arguments, got %d" % (
+ methname, classname,
+ len(argtypes), len(args))
+
+ buf = []
+ for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
+ try:
+ buf.append(fmt % encode(val))
+ except:
+ import traceback
+ raise TypeError, "Argument %d of stub method %s of class %s "\
+ "requires a value of type %s, but got %s - nested error: %s" % (
+ argnum, methname, classname,
+ getattr(typ, '__name__', typ), type(val),
+ traceback.format_exc()
+ )
+
+ return "%d|%s" % (command, '|'.join(buf))
+
+ @staticmethod
+ def _parse_reply(rvtype, methname, classname, reply):
+ if not reply:
+ raise RuntimeError, "Invalid reply: %r "\
+ "for stub method %s of class %s" % (
+ reply,
+ methname,
+ classname)
+
+ try:
+ result = reply.split("|")
+ code = int(result[0])
+ text = result[1]
+ except:
+ import traceback
+ raise TypeError, "Return value of stub method %s of class %s "\
+ "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
+ methname, classname,
+ getattr(rvtype, '__name__', rvtype), reply,
+ traceback.format_exc()
+ )
+ if code == ERROR:
+ text = base64.b64decode(text)
+ raise RuntimeError(text)
+ elif code == OK:
+ try:
+ if rvtype is None:
+ return
+ else:
+ return rvtype(text)
+ except:
+ import traceback
+ raise TypeError, "Return value of stub method %s of class %s "\
+ "cannot be parsed: must be of type %s - nested error: %s" % (
+ methname, classname,
+ getattr(rvtype, '__name__', rvtype),
+ traceback.format_exc()
+ )
+ else:
+ raise RuntimeError, "Invalid reply: %r "\
+ "for stub method %s of class %s - unknown code" % (
+ reply,
+ methname,
+ classname)
+
+ @staticmethod
+ def _make_stubs(server_class, template_class):
+ """
+ Returns a dictionary method_name -> method
+ with stub methods.
+
+ Usage:
+
+ class SomeProxy(BaseProxy):
+ ...
+
+ locals().update( BaseProxy._make_stubs(
+ ServerClass,
+ TemplateClass
+ ) )
+
+ ServerClass is the corresponding Server class, as
+ specified in the _ServerClass class method (_make_stubs
+ is static and can't access the method), and TemplateClass
+ is the ultimate implementation class behind the server,
+ from which argument names and defaults are taken, to
+ maintain meaningful interfaces.
+ """
+ rv = {}
+
+ class NONE: pass
+
+ import os.path
+ func_template_path = os.path.join(
+ os.path.dirname(__file__),
+ 'proxy_stub.tpl')
+ func_template_file = open(func_template_path, "r")
+ func_template = func_template_file.read()
+ func_template_file.close()
+
+ for methname in vars(template_class):
+ if hasattr(server_class, methname) and not methname.startswith('_'):
+ template_meth = getattr(template_class, methname)
+ server_meth = getattr(server_class, methname)
+
+ command = getattr(server_meth, '_handles_command', None)
+ argtypes = getattr(server_meth, '_argtypes', None)
+ argencoders = getattr(server_meth, '_argencoders', None)
+ rvtype = getattr(server_meth, '_retval', None)
+ doprop = False
+
+ if hasattr(template_meth, 'fget'):
+ # property getter
+ template_meth = template_meth.fget
+ doprop = True
+
+ if command is not None and argtypes is not None and argencoders is not None:
+ # We have an interface method...
+ code = template_meth.func_code
+ argnames = code.co_varnames[:code.co_argcount]
+ argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
+ + (template_meth.func_defaults or ()) )
+
+ func_globals = dict(
+ BaseProxy = BaseProxy,
+ argtypes = argtypes,
+ argencoders = argencoders,
+ rvtype = rvtype,
+ )
+ context = dict()
+
+ func_text = func_template % dict(
+ self = argnames[0],
+ args = '%s' % (','.join(argnames[1:])),
+ argdefs = ','.join([
+ argname if argdef is NONE
+ else "%s=%r" % (argname, argdef)
+ for argname, argdef in zip(argnames[1:], argdefaults[1:])
+ ]),
+ command = command,
+ methname = methname,
+ classname = server_class.__name__
+ )
+
+ func_text = compile(
+ func_text,
+ func_template_path,
+ 'exec')
+
+ exec func_text in func_globals, context
+
+ if doprop:
+ rv[methname] = property(context[methname])
+ else:
+ rv[methname] = context[methname]
+
+ return rv
+
+class TestbedControllerProxy(BaseProxy):
+
+ _ServerClass = TestbedControllerServer
+
+ def __init__(self, root_dir, log_level, testbed_id = None,
+ testbed_version = None, launch = True, host = None,
+ port = None, user = None, ident_key = None, agent = None,
+ environment_setup = ""):
+ if launch and (testbed_id == None or testbed_version == None):
+ raise RuntimeError("To launch a TesbedControllerServer a "
+ "testbed_id and testbed_version are required")
+ super(TestbedControllerProxy,self).__init__(
+ ctor_args = (root_dir, log_level, testbed_id, testbed_version),
+ root_dir = root_dir,
+ launch = launch, host = host, port = port, user = user,
+ ident_key = ident_key, agent = agent,
+ environment_setup = environment_setup)
+
+ locals().update( BaseProxy._make_stubs(
+ server_class = TestbedControllerServer,
+ template_class = nepi.core.execute.TestbedController,
+ ) )
+
+ # Shutdown stops the serverside...
+ def shutdown(self, _stub = shutdown):
+ rv = _stub(self)
self._client.send_stop()
self._client.read_reply() # wait for it
+ return rv
+
-class ExperimentControllerProxy(object):
+class ExperimentControllerProxy(BaseProxy):
+ _ServerClass = ExperimentControllerServer
+
def __init__(self, root_dir, log_level, experiment_xml = None,
launch = True, host = None, port = None, user = None,
ident_key = None, agent = None, environment_setup = ""):
- if launch:
- # launch server
- if experiment_xml == None:
- raise RuntimeError("To launch a ExperimentControllerServer a \
- xml description of the experiment is required")
- # ssh
- if host != None:
- xml = experiment_xml
- python_code = "from nepi.util.proxy import ExperimentControllerServer;\
- s = ExperimentControllerServer(%r, %r, %r);\
- s.run()" % (root_dir, log_level, xml)
- proc = server.popen_ssh_subprocess(python_code, host = host,
- port = port, user = user, agent = agent,
- ident_key = ident_key,
- environment_setup = environment_setup,
- waitcommand = True)
- if proc.poll():
- err = proc.stderr.read()
- raise RuntimeError("Server could not be executed: %s" % \
- err)
- else:
- # launch daemon
- s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
- s.run()
+ if launch and experiment_xml is None:
+ raise RuntimeError("To launch a ExperimentControllerServer a \
+ xml description of the experiment is required")
+ super(ExperimentControllerProxy,self).__init__(
+ ctor_args = (root_dir, log_level, experiment_xml),
+ root_dir = root_dir,
+ launch = launch, host = host, port = port, user = user,
+ ident_key = ident_key, agent = agent,
+ environment_setup = environment_setup)
+
+ locals().update( BaseProxy._make_stubs(
+ server_class = ExperimentControllerServer,
+ template_class = nepi.core.execute.ExperimentController,
+ ) )
- # connect client to server
- self._client = server.Client(root_dir, host = host, port = port,
- user = user, agent = agent,
- environment_setup = environment_setup)
-
- @property
- def experiment_xml(self):
- msg = controller_messages[XML]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
-
- def trace(self, testbed_guid, guid, trace_id, attribute='value'):
- msg = controller_messages[TRACE]
- attribute = base64.b64encode(attribute)
- msg = msg % (testbed_guid, guid, trace_id, attribute)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == OK:
- return text
- raise RuntimeError(text)
-
- def start(self):
- msg = controller_messages[START]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def stop(self):
- msg = controller_messages[STOP]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def recover(self):
- msg = controller_messages[RECOVER]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def is_finished(self, guid):
- msg = controller_messages[FINISHED]
- msg = msg % guid
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text == "True"
-
- def set(self, testbed_guid, guid, name, value, time = TIME_NOW):
- msg = testbed_messages[EXPERIMENT_SET]
- type = get_type(value)
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- value = base64.b64encode(str(value))
- msg = msg % (testbed_guid, guid, name, value, type, time)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def get(self, testbed_guid, guid, name, time = TIME_NOW):
- msg = testbed_messages[EXPERIMENT_GET]
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- msg = msg % (testbed_guid, guid, name, time)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
-
- def shutdown(self):
- msg = controller_messages[SHUTDOWN]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- if not reply:
- raise RuntimeError, "Invalid reply: %r" % (reply,)
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
+
+ # Shutdown stops the serverside...
+ def shutdown(self, _stub = shutdown):
+ rv = _stub(self)
self._client.send_stop()
self._client.read_reply() # wait for it
+ return rv