# -*- coding: utf-8 -*-
import base64
+import nepi.core.execute
+import nepi.util.environ
from nepi.core.attributes import AttributesMap, Attribute
from nepi.util import server, validation
-from nepi.util.constants import TIME_NOW
+from nepi.util.constants import TIME_NOW, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, DeploymentConfiguration as DC
import getpass
import cPickle
import sys
import time
import tempfile
import shutil
+import functools
+import os
# PROTOCOL REPLIES
OK = 0
# PROTOCOL INSTRUCTION MESSAGES
XML = 2
-ACCESS = 3
TRACE = 4
FINISHED = 5
START = 6
GET_ROUTE = 28
GET_ADDRESS = 29
RECOVER = 30
-DO_PRECONFIGURE = 31
-GET_ATTRIBUTE_LIST = 32
-DO_CONNECT_COMPL = 33
+DO_PRECONFIGURE = 31
+GET_ATTRIBUTE_LIST = 32
+DO_CONNECT_COMPL = 33
DO_CROSS_CONNECT_COMPL = 34
-
-# PARAMETER TYPE
-STRING = 100
-INTEGER = 101
-BOOL = 102
-FLOAT = 103
-
-# EXPERIMENT CONTROLER PROTOCOL MESSAGES
-controller_messages = dict({
- XML: "%d" % XML,
- ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
- TRACE: "%d|%s" % (TRACE, "%d|%d|%s|%s"),
- FINISHED: "%d|%s" % (FINISHED, "%d"),
- START: "%d" % START,
- STOP: "%d" % STOP,
- RECOVER : "%d" % RECOVER,
- SHUTDOWN: "%d" % SHUTDOWN,
- })
-
-# TESTBED INSTANCE PROTOCOL MESSAGES
-testbed_messages = dict({
- TRACE: "%d|%s" % (TRACE, "%d|%s|%s"),
- START: "%d" % START,
- STOP: "%d" % STOP,
- SHUTDOWN: "%d" % SHUTDOWN,
- CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
- CREATE: "%d|%s" % (CREATE, "%d|%s"),
- CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
- FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
- CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
- CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
- ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
- ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%s|%d|%s"),
- ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
- DO_SETUP: "%d" % DO_SETUP,
- DO_CREATE: "%d" % DO_CREATE,
- DO_CONNECT_INIT: "%d" % DO_CONNECT_INIT,
- DO_CONNECT_COMPL: "%d" % DO_CONNECT_COMPL,
- DO_CONFIGURE: "%d" % DO_CONFIGURE,
- DO_PRECONFIGURE: "%d" % DO_PRECONFIGURE,
- DO_CROSS_CONNECT_INIT: "%d|%s" % (DO_CROSS_CONNECT_INIT, "%s"),
- DO_CROSS_CONNECT_COMPL: "%d|%s" % (DO_CROSS_CONNECT_COMPL, "%s"),
- GET: "%d|%s" % (GET, "%s|%d|%s"),
- SET: "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
- GET_ROUTE: "%d|%s" % (GET, "%d|%d|%s"),
- GET_ADDRESS: "%d|%s" % (GET, "%d|%d|%s"),
- ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
- STATUS: "%d|%s" % (STATUS, "%d"),
- GUIDS: "%d" % GUIDS,
- GET_ATTRIBUTE_LIST: "%d" % GET_ATTRIBUTE_LIST,
- })
+TESTBED_ID = 35
+TESTBED_VERSION = 36
+DO_PRESTART = 37
+GET_FACTORY_ID = 38
+GET_TESTBED_ID = 39
+GET_TESTBED_VERSION = 40
+TRACES_INFO = 41
+EXEC_XML = 42
+TESTBED_STATUS = 43
+STARTED_TIME = 44
+STOPPED_TIME = 45
instruction_text = dict({
OK: "OK",
ERROR: "ERROR",
XML: "XML",
- ACCESS: "ACCESS",
+ EXEC_XML: "EXEC_XML",
TRACE: "TRACE",
FINISHED: "FINISHED",
START: "START",
GET_ROUTE: "GET_ROUTE",
GET_ADDRESS: "GET_ADDRESS",
GET_ATTRIBUTE_LIST: "GET_ATTRIBUTE_LIST",
+ GET_FACTORY_ID: "GET_FACTORY_ID",
+ GET_TESTBED_ID: "GET_TESTBED_ID",
+ GET_TESTBED_VERSION: "GET_TESTBED_VERSION",
ACTION: "ACTION",
STATUS: "STATUS",
GUIDS: "GUIDS",
- STRING: "STRING",
- INTEGER: "INTEGER",
- BOOL: "BOOL",
- FLOAT: "FLOAT"
- })
+ TESTBED_ID: "TESTBED_ID",
+ TESTBED_VERSION: "TESTBED_VERSION",
+ TRACES_INFO: "TRACES_INFO",
+ STARTED_TIME: "STARTED_TIME",
+ STOPPED_TIME: "STOPPED_TIME",
-def get_type(value):
- if isinstance(value, bool):
- return BOOL
- elif isinstance(value, int):
- return INTEGER
- elif isinstance(value, float):
- return FLOAT
- else:
- return STRING
-
-def set_type(type, value):
- if type == INTEGER:
- value = int(value)
- elif type == FLOAT:
- value = float(value)
- elif type == BOOL:
- value = value == "True"
- else:
- value = str(value)
- return value
+ })
def log_msg(server, params):
- instr = int(params[0])
- instr_txt = instruction_text[instr]
- server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
- instr_txt, ", ".join(map(str, params[1:]))))
+ try:
+ instr = int(params[0])
+ instr_txt = instruction_text[instr]
+ server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
+ instr_txt, ", ".join(map(str, params[1:]))))
+ except:
+ # don't die for logging
+ pass
def log_reply(server, reply):
- res = reply.split("|")
- code = int(res[0])
- code_txt = instruction_text[code]
- txt = base64.b64decode(res[1])
- server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
- code_txt, txt))
+ try:
+ res = reply.split("|")
+ code = int(res[0])
+ code_txt = instruction_text[code]
+ try:
+ txt = base64.b64decode(res[1])
+ except:
+ txt = res[1]
+ server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
+ code_txt, txt))
+ except:
+ # don't die for logging
+ server.log_debug("%s - reply: %s" % (server.__class__.__name__,
+ reply))
+ pass
def to_server_log_level(log_level):
- return server.DEBUG_LEVEL \
- if log_level == AccessConfiguration.DEBUG_LEVEL \
- else server.ERROR_LEVEL
+ return (
+ DC.DEBUG_LEVEL
+ if log_level == DC.DEBUG_LEVEL
+ else DC.ERROR_LEVEL
+ )
def get_access_config_params(access_config):
- root_dir = access_config.get_attribute_value("rootDirectory")
- log_level = access_config.get_attribute_value("logLevel")
+ mode = access_config.get_attribute_value(DC.DEPLOYMENT_MODE)
+ launch = not access_config.get_attribute_value(DC.RECOVER)
+ root_dir = access_config.get_attribute_value(DC.ROOT_DIRECTORY)
+ log_level = access_config.get_attribute_value(DC.LOG_LEVEL)
log_level = to_server_log_level(log_level)
- user = host = port = agent = None
- communication = access_config.get_attribute_value("communication")
- if communication == AccessConfiguration.ACCESS_SSH:
- user = access_config.get_attribute_value("user")
- host = access_config.get_attribute_value("host")
- port = access_config.get_attribute_value("port")
- agent = access_config.get_attribute_value("useAgent")
- return (root_dir, log_level, user, host, port, agent)
+ communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
+ environment_setup = (
+ access_config.get_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
+ if access_config.has_attribute(DC.DEPLOYMENT_ENVIRONMENT_SETUP)
+ else ""
+ )
+ user = access_config.get_attribute_value(DC.DEPLOYMENT_USER)
+ host = access_config.get_attribute_value(DC.DEPLOYMENT_HOST)
+ port = access_config.get_attribute_value(DC.DEPLOYMENT_PORT)
+ agent = access_config.get_attribute_value(DC.USE_AGENT)
+ sudo = access_config.get_attribute_value(DC.USE_SUDO)
+ key = access_config.get_attribute_value(DC.DEPLOYMENT_KEY)
+ communication = access_config.get_attribute_value(DC.DEPLOYMENT_COMMUNICATION)
+ clean_root = access_config.get_attribute_value(DC.CLEAN_ROOT)
+ return (mode, launch, root_dir, log_level, communication, user, host, port,
+ key, agent, sudo, environment_setup, clean_root)
class AccessConfiguration(AttributesMap):
- MODE_SINGLE_PROCESS = "SINGLE"
- MODE_DAEMON = "DAEMON"
- ACCESS_SSH = "SSH"
- ACCESS_LOCAL = "LOCAL"
- ERROR_LEVEL = "Error"
- DEBUG_LEVEL = "Debug"
-
- def __init__(self):
+ def __init__(self, params = None):
super(AccessConfiguration, self).__init__()
- self.add_attribute(name = "mode",
- help = "Instance execution mode",
- type = Attribute.ENUM,
- value = AccessConfiguration.MODE_SINGLE_PROCESS,
- allowed = [AccessConfiguration.MODE_DAEMON,
- AccessConfiguration.MODE_SINGLE_PROCESS],
- validation_function = validation.is_enum)
- self.add_attribute(name = "communication",
- help = "Instance communication mode",
- type = Attribute.ENUM,
- value = AccessConfiguration.ACCESS_LOCAL,
- allowed = [AccessConfiguration.ACCESS_LOCAL,
- AccessConfiguration.ACCESS_SSH],
- validation_function = validation.is_enum)
- self.add_attribute(name = "host",
- help = "Host where the testbed will be executed",
- type = Attribute.STRING,
- value = "localhost",
- validation_function = validation.is_string)
- self.add_attribute(name = "user",
- help = "User on the Host to execute the testbed",
- type = Attribute.STRING,
- value = getpass.getuser(),
- validation_function = validation.is_string)
- self.add_attribute(name = "port",
- help = "Port on the Host",
- type = Attribute.INTEGER,
- value = 22,
- validation_function = validation.is_integer)
- self.add_attribute(name = "rootDirectory",
- help = "Root directory for storing process files",
- type = Attribute.STRING,
- value = ".",
- validation_function = validation.is_string) # TODO: validation.is_path
- self.add_attribute(name = "useAgent",
- help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
- type = Attribute.BOOL,
- value = False,
- validation_function = validation.is_bool)
- self.add_attribute(name = "logLevel",
- help = "Log level for instance",
- type = Attribute.ENUM,
- value = AccessConfiguration.ERROR_LEVEL,
- allowed = [AccessConfiguration.ERROR_LEVEL,
- AccessConfiguration.DEBUG_LEVEL],
- validation_function = validation.is_enum)
- self.add_attribute(name = "recover",
- help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
- type = Attribute.BOOL,
- value = False,
- validation_function = validation.is_bool)
+
+ from nepi.core.metadata import Metadata
+
+ for _,attr_info in Metadata.PROXY_ATTRIBUTES.iteritems():
+ self.add_attribute(**attr_info)
+
+ if params:
+ for attr_name, attr_value in params.iteritems():
+ parser = Attribute.type_parsers[self.get_attribute_type(attr_name)]
+ attr_value = parser(attr_value)
+ self.set_attribute_value(attr_name, attr_value)
class TempDir(object):
def __init__(self):
def __init__(self, path):
self.path = path
-def create_controller(xml, access_config = None):
- mode = None if not access_config \
- else access_config.get_attribute_value("mode")
- launch = True if not access_config \
- else not access_config.get_attribute_value("recover")
- if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
- if not launch:
- raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
-
+def create_experiment_controller(xml, access_config = None):
+ mode = None
+ launch = True
+ log_level = DC.ERROR_LEVEL
+ if access_config:
+ (mode, launch, root_dir, log_level, communication, user, host, port,
+ key, agent, sudo, environment_setup, clean_root) \
+ = get_access_config_params(access_config)
+
+ os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
+
+ if not mode or mode == DC.MODE_SINGLE_PROCESS:
from nepi.core.execute import ExperimentController
- if not access_config or not access_config.has_attribute("rootDirectory"):
+ if not access_config or not access_config.has_attribute(DC.ROOT_DIRECTORY):
root_dir = TempDir()
else:
- root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
+ root_dir = PermDir(access_config.get_attribute_value(DC.ROOT_DIRECTORY))
controller = ExperimentController(xml, root_dir.path)
# inject reference to temporary dir, so that it gets cleaned
# up at destruction time.
controller._tempdir = root_dir
+ if not launch:
+ # try to recover
+ controller.recover()
+
return controller
- elif mode == AccessConfiguration.MODE_DAEMON:
- (root_dir, log_level, user, host, port, agent) = \
- get_access_config_params(access_config)
- return ExperimentControllerProxy(root_dir, log_level,
- experiment_xml = xml, host = host, port = port, user = user,
- agent = agent, launch = launch)
+ elif mode == DC.MODE_DAEMON:
+ try:
+ return ExperimentControllerProxy(root_dir, log_level,
+ experiment_xml = xml,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = key,
+ agent = agent,
+ sudo = sudo,
+ launch = launch,
+ environment_setup = environment_setup,
+ clean_root = clean_root)
+ except:
+ if not launch:
+ # Maybe controller died, recover from persisted testbed information if possible
+ controller = ExperimentControllerProxy(root_dir, log_level,
+ experiment_xml = xml,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = key,
+ agent = agent,
+ sudo = sudo,
+ launch = True,
+ environment_setup = environment_setup,
+ clean_root = clean_root)
+ controller.recover()
+ return controller
+ else:
+ raise
raise RuntimeError("Unsupported access configuration '%s'" % mode)
def create_testbed_controller(testbed_id, testbed_version, access_config):
- mode = None if not access_config \
- else access_config.get_attribute_value("mode")
- launch = True if not access_config \
- else not access_config.get_attribute_value("recover")
- if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
+ mode = None
+ launch = True
+ log_level = DC.ERROR_LEVEL
+ if access_config:
+ (mode, launch, root_dir, log_level, communication, user, host, port,
+ key, agent, sudo, environment_setup, clean_root) \
+ = get_access_config_params(access_config)
+
+ os.environ["NEPI_CONTROLLER_LOGLEVEL"] = log_level
+
+ if not mode or mode == DC.MODE_SINGLE_PROCESS:
if not launch:
- raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
+ raise ValueError, "Unsupported instantiation mode: %s with launch=False" % (mode,)
return _build_testbed_controller(testbed_id, testbed_version)
- elif mode == AccessConfiguration.MODE_DAEMON:
- (root_dir, log_level, user, host, port, agent) = \
- get_access_config_params(access_config)
- return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
- testbed_version = testbed_version, host = host, port = port,
- user = user, agent = agent, launch = launch)
+ elif mode == DC.MODE_DAEMON:
+ return TestbedControllerProxy(root_dir, log_level,
+ testbed_id = testbed_id,
+ testbed_version = testbed_version,
+ communication = communication,
+ host = host,
+ port = port,
+ ident_key = key,
+ user = user,
+ agent = agent,
+ sudo = sudo,
+ launch = launch,
+ environment_setup = environment_setup,
+ clean_root = clean_root)
raise RuntimeError("Unsupported access configuration '%s'" % mode)
def _build_testbed_controller(testbed_id, testbed_version):
- mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
+ mod_name = nepi.util.environ.find_testbed(testbed_id)
+
if not mod_name in sys.modules:
- __import__(mod_name)
+ try:
+ __import__(mod_name)
+ except ImportError:
+ raise ImportError, "Cannot find module %s in %r" % (mod_name, sys.path)
+
module = sys.modules[mod_name]
- return module.TestbedController(testbed_version)
-
-class TestbedControllerServer(server.Server):
- def __init__(self, root_dir, log_level, testbed_id, testbed_version):
- super(TestbedControllerServer, self).__init__(root_dir, log_level)
- self._testbed_id = testbed_id
- self._testbed_version = testbed_version
- self._testbed = None
-
- def post_daemonize(self):
- self._testbed = _build_testbed_controller(self._testbed_id,
- self._testbed_version)
+ tc = module.TestbedController()
+ if tc.testbed_version != testbed_version:
+ raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
+ (testbed_id, testbed_version, tc.testbed_version))
+ return tc
+
+# Just a namespace class
+class Marshalling:
+ class Decoders:
+ @staticmethod
+ def pickled_data(sdata):
+ return cPickle.loads(base64.b64decode(sdata))
+
+ @staticmethod
+ def base64_data(sdata):
+ return base64.b64decode(sdata)
+
+ @staticmethod
+ def nullint(sdata):
+ return None if sdata == "None" else int(sdata)
+ @staticmethod
+ def bool(sdata):
+ return sdata == 'True'
+
+ class Encoders:
+ @staticmethod
+ def pickled_data(data):
+ return base64.b64encode(cPickle.dumps(data))
+
+ @staticmethod
+ def base64_data(data):
+ return base64.b64encode(data)
+
+ @staticmethod
+ def nullint(data):
+ return "None" if data is None else int(data)
+
+ @staticmethod
+ def bool(data):
+ return str(bool(data))
+
+ # import into Marshalling all the decoders
+ # they act as types
+ locals().update([
+ (typname, typ)
+ for typname, typ in vars(Decoders).iteritems()
+ if not typname.startswith('_')
+ ])
+
+ _TYPE_ENCODERS = dict([
+ # id(type) -> (<encoding_function>, <formatting_string>)
+ (typname, (getattr(Encoders,typname),"%s"))
+ for typname in vars(Decoders)
+ if not typname.startswith('_')
+ and hasattr(Encoders,typname)
+ ])
+
+ # Builtins
+ _TYPE_ENCODERS["float"] = (float, "%r")
+ _TYPE_ENCODERS["int"] = (int, "%d")
+ _TYPE_ENCODERS["long"] = (int, "%d")
+ _TYPE_ENCODERS["str"] = (str, "%s")
+ _TYPE_ENCODERS["unicode"] = (str, "%s")
+
+ # Generic encoder
+ _TYPE_ENCODERS[None] = (str, "%s")
+
+ @staticmethod
+ def args(*types):
+ """
+ Decorator that converts the given function into one that takes
+ a single "params" list, with each parameter marshalled according
+ to the given factory callable (type constructors are accepted).
+
+ The first argument (self) is left untouched.
+
+ eg:
+
+ @Marshalling.args(int,int,str,base64_data)
+ def somefunc(self, someint, otherint, somestr, someb64):
+ return someretval
+ """
+ def decor(f):
+ @functools.wraps(f)
+ def rv(self, params):
+ return f(self, *[ ctor(val)
+ for ctor,val in zip(types, params[1:]) ])
+
+ rv._argtypes = types
+
+ # Derive type encoders by looking up types in _TYPE_ENCODERS
+ # make_proxy will use it to encode arguments in command strings
+ argencoders = []
+ TYPE_ENCODERS = Marshalling._TYPE_ENCODERS
+ for typ in types:
+ if typ.__name__ in TYPE_ENCODERS:
+ argencoders.append(TYPE_ENCODERS[typ.__name__])
+ else:
+ # generic encoder
+ argencoders.append(TYPE_ENCODERS[None])
+
+ rv._argencoders = tuple(argencoders)
+
+ rv._retval = getattr(f, '_retval', None)
+ return rv
+ return decor
+
+ @staticmethod
+ def retval(typ=Decoders.base64_data):
+ """
+ Decorator that converts the given function into one that
+ returns a properly encoded return string, given that the undecorated
+ function returns suitable input for the encoding function.
+
+ The optional typ argument specifies a type.
+ For the default of base64_data, return values should be strings.
+ The return value of the encoding method should be a string always.
+
+ eg:
+
+ @Marshalling.args(int,int,str,base64_data)
+ @Marshalling.retval(str)
+ def somefunc(self, someint, otherint, somestr, someb64):
+ return someint
+ """
+ encode, fmt = Marshalling._TYPE_ENCODERS.get(
+ typ.__name__,
+ Marshalling._TYPE_ENCODERS[None])
+ fmt = "%d|"+fmt
+
+ def decor(f):
+ @functools.wraps(f)
+ def rv(self, *p, **kw):
+ data = f(self, *p, **kw)
+ return fmt % (
+ OK,
+ encode(data)
+ )
+ rv._retval = typ
+ rv._argtypes = getattr(f, '_argtypes', None)
+ rv._argencoders = getattr(f, '_argencoders', None)
+ return rv
+ return decor
+
+ @staticmethod
+ def retvoid(f):
+ """
+ Decorator that converts the given function into one that
+ always return an encoded empty string.
+
+ Useful for null-returning functions.
+ """
+ OKRV = "%d|" % (OK,)
+
+ @functools.wraps(f)
+ def rv(self, *p, **kw):
+ f(self, *p, **kw)
+ return OKRV
+
+ rv._retval = None
+ rv._argtypes = getattr(f, '_argtypes', None)
+ rv._argencoders = getattr(f, '_argencoders', None)
+ return rv
+
+ @staticmethod
+ def handles(whichcommand):
+ """
+ Associates the method with a given command code for servers.
+ It should always be the topmost decorator.
+ """
+ def decor(f):
+ f._handles_command = whichcommand
+ return f
+ return decor
+
+class BaseServer(server.Server):
def reply_action(self, msg):
if not msg:
result = base64.b64encode("Invalid command line")
instruction = int(params[0])
log_msg(self, params)
try:
- if instruction == TRACE:
- reply = self.trace(params)
- elif instruction == START:
- reply = self.start(params)
- elif instruction == STOP:
- reply = self.stop(params)
- elif instruction == SHUTDOWN:
- reply = self.shutdown(params)
- elif instruction == CONFIGURE:
- reply = self.defer_configure(params)
- elif instruction == CREATE:
- reply = self.defer_create(params)
- elif instruction == CREATE_SET:
- reply = self.defer_create_set(params)
- elif instruction == FACTORY_SET:
- reply = self.defer_factory_set(params)
- elif instruction == CONNECT:
- reply = self.defer_connect(params)
- elif instruction == CROSS_CONNECT:
- reply = self.defer_cross_connect(params)
- elif instruction == ADD_TRACE:
- reply = self.defer_add_trace(params)
- elif instruction == ADD_ADDRESS:
- reply = self.defer_add_address(params)
- elif instruction == ADD_ROUTE:
- reply = self.defer_add_route(params)
- elif instruction == DO_SETUP:
- reply = self.do_setup(params)
- elif instruction == DO_CREATE:
- reply = self.do_create(params)
- elif instruction == DO_CONNECT_INIT:
- reply = self.do_connect_init(params)
- elif instruction == DO_CONNECT_COMPL:
- reply = self.do_connect_compl(params)
- elif instruction == DO_CONFIGURE:
- reply = self.do_configure(params)
- elif instruction == DO_PRECONFIGURE:
- reply = self.do_preconfigure(params)
- elif instruction == DO_CROSS_CONNECT_INIT:
- reply = self.do_cross_connect_init(params)
- elif instruction == DO_CROSS_CONNECT_COMPL:
- reply = self.do_cross_connect_compl(params)
- elif instruction == GET:
- reply = self.get(params)
- elif instruction == SET:
- reply = self.set(params)
- elif instruction == GET_ADDRESS:
- reply = self.get_address(params)
- elif instruction == GET_ROUTE:
- reply = self.get_route(params)
- elif instruction == ACTION:
- reply = self.action(params)
- elif instruction == STATUS:
- reply = self.status(params)
- elif instruction == GUIDS:
- reply = self.guids(params)
- elif instruction == GET_ATTRIBUTE_LIST:
- reply = self.get_attribute_list(params)
+ for mname,meth in vars(self.__class__).iteritems():
+ if not mname.startswith('_'):
+ cmd = getattr(meth, '_handles_command', None)
+ if cmd == instruction:
+ meth = getattr(self, mname)
+ reply = meth(params)
+ break
else:
error = "Invalid instruction %s" % instruction
self.log_error(error)
log_reply(self, reply)
return reply
- def guids(self, params):
- guids = self._testbed.guids
- guids = ",".join(map(str, guids))
- result = base64.b64encode(guids)
- return "%d|%s" % (OK, result)
+class TestbedControllerServer(BaseServer):
+ def __init__(self, root_dir, log_level, testbed_id, testbed_version,
+ environment_setup, clean_root):
+ super(TestbedControllerServer, self).__init__(root_dir, log_level,
+ environment_setup = environment_setup, clean_root = clean_root)
+ self._testbed_id = testbed_id
+ self._testbed_version = testbed_version
+ self._testbed = None
- def defer_create(self, params):
- guid = int(params[1])
- factory_id = params[2]
- self._testbed.defer_create(guid, factory_id)
- return "%d|%s" % (OK, "")
+ def post_daemonize(self):
+ self._testbed = _build_testbed_controller(self._testbed_id,
+ self._testbed_version)
- def trace(self, params):
- guid = int(params[1])
- trace_id = params[2]
- attribute = base64.b64decode(params[3])
- trace = self._testbed.trace(guid, trace_id, attribute)
- result = base64.b64encode(trace)
- return "%d|%s" % (OK, result)
+ @Marshalling.handles(GUIDS)
+ @Marshalling.args()
+ @Marshalling.retval( Marshalling.pickled_data )
+ def guids(self):
+ return self._testbed.guids
+
+ @Marshalling.handles(TESTBED_ID)
+ @Marshalling.args()
+ @Marshalling.retval()
+ def testbed_id(self):
+ return str(self._testbed.testbed_id)
+
+ @Marshalling.handles(TESTBED_VERSION)
+ @Marshalling.args()
+ @Marshalling.retval()
+ def testbed_version(self):
+ return str(self._testbed.testbed_version)
+
+ @Marshalling.handles(CREATE)
+ @Marshalling.args(int, str)
+ @Marshalling.retvoid
+ def defer_create(self, guid, factory_id):
+ self._testbed.defer_create(guid, factory_id)
- def start(self, params):
+ @Marshalling.handles(TRACE)
+ @Marshalling.args(int, str, Marshalling.base64_data)
+ @Marshalling.retval()
+ def trace(self, guid, trace_id, attribute):
+ return self._testbed.trace(guid, trace_id, attribute)
+
+ @Marshalling.handles(TRACES_INFO)
+ @Marshalling.args()
+ @Marshalling.retval( Marshalling.pickled_data )
+ def traces_info(self):
+ return self._testbed.traces_info()
+
+ @Marshalling.handles(START)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def start(self):
self._testbed.start()
- return "%d|%s" % (OK, "")
- def stop(self, params):
+ @Marshalling.handles(STOP)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def stop(self):
self._testbed.stop()
- return "%d|%s" % (OK, "")
- def shutdown(self, params):
+ @Marshalling.handles(SHUTDOWN)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def shutdown(self):
self._testbed.shutdown()
- return "%d|%s" % (OK, "")
- def defer_configure(self, params):
- name = base64.b64decode(params[1])
- value = base64.b64decode(params[2])
- type = int(params[3])
- value = set_type(type, value)
+ @Marshalling.handles(CONFIGURE)
+ @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
+ @Marshalling.retvoid
+ def defer_configure(self, name, value):
self._testbed.defer_configure(name, value)
- return "%d|%s" % (OK, "")
-
- def defer_create_set(self, params):
- guid = int(params[1])
- name = base64.b64decode(params[2])
- value = base64.b64decode(params[3])
- type = int(params[4])
- value = set_type(type, value)
+
+ @Marshalling.handles(CREATE_SET)
+ @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data)
+ @Marshalling.retvoid
+ def defer_create_set(self, guid, name, value):
self._testbed.defer_create_set(guid, name, value)
- return "%d|%s" % (OK, "")
- def defer_factory_set(self, params):
- name = base64.b64decode(params[1])
- value = base64.b64decode(params[2])
- type = int(params[3])
- value = set_type(type, value)
+ @Marshalling.handles(FACTORY_SET)
+ @Marshalling.args(Marshalling.base64_data, Marshalling.pickled_data)
+ @Marshalling.retvoid
+ def defer_factory_set(self, name, value):
self._testbed.defer_factory_set(name, value)
- return "%d|%s" % (OK, "")
- def defer_connect(self, params):
- guid1 = int(params[1])
- connector_type_name1 = params[2]
- guid2 = int(params[3])
- connector_type_name2 = params[4]
+ @Marshalling.handles(CONNECT)
+ @Marshalling.args(int, str, int, str)
+ @Marshalling.retvoid
+ def defer_connect(self, guid1, connector_type_name1, guid2, connector_type_name2):
self._testbed.defer_connect(guid1, connector_type_name1, guid2,
connector_type_name2)
- return "%d|%s" % (OK, "")
-
- def defer_cross_connect(self, params):
- guid = int(params[1])
- connector_type_name = params[2]
- cross_guid = int(params[3])
- connector_type_name = params[4]
- cross_guid = int(params[5])
- cross_testbed_id = params[6]
- cross_factory_id = params[7]
- cross_connector_type_name = params[8]
+
+ @Marshalling.handles(CROSS_CONNECT)
+ @Marshalling.args(int, str, int, int, str, str, str)
+ @Marshalling.retvoid
+ def defer_cross_connect(self,
+ guid, connector_type_name,
+ cross_guid, cross_testbed_guid,
+ cross_testbed_id, cross_factory_id,
+ cross_connector_type_name):
self._testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
- cross_testbed_id, cross_factory_id, cross_connector_type_name)
- return "%d|%s" % (OK, "")
+ cross_testbed_guid, cross_testbed_id, cross_factory_id,
+ cross_connector_type_name)
- def defer_add_trace(self, params):
- guid = int(params[1])
- trace_id = params[2]
+ @Marshalling.handles(ADD_TRACE)
+ @Marshalling.args(int, str)
+ @Marshalling.retvoid
+ def defer_add_trace(self, guid, trace_id):
self._testbed.defer_add_trace(guid, trace_id)
- return "%d|%s" % (OK, "")
- def defer_add_address(self, params):
- guid = int(params[1])
- address = params[2]
- netprefix = int(params[3])
- broadcast = params[4]
+ @Marshalling.handles(ADD_ADDRESS)
+ @Marshalling.args(int, str, int, Marshalling.pickled_data)
+ @Marshalling.retvoid
+ def defer_add_address(self, guid, address, netprefix, broadcast):
self._testbed.defer_add_address(guid, address, netprefix,
broadcast)
- return "%d|%s" % (OK, "")
- def defer_add_route(self, params):
- guid = int(params[1])
- destination = params[2]
- netprefix = int(params[3])
- nexthop = params[4]
- self._testbed.defer_add_route(guid, destination, netprefix, nexthop)
- return "%d|%s" % (OK, "")
+ @Marshalling.handles(ADD_ROUTE)
+ @Marshalling.args(int, str, int, str, int)
+ @Marshalling.retvoid
+ def defer_add_route(self, guid, destination, netprefix, nexthop, metric):
+ self._testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
- def do_setup(self, params):
+ @Marshalling.handles(DO_SETUP)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_setup(self):
self._testbed.do_setup()
- return "%d|%s" % (OK, "")
- def do_create(self, params):
+ @Marshalling.handles(DO_CREATE)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_create(self):
self._testbed.do_create()
- return "%d|%s" % (OK, "")
- def do_connect_init(self, params):
+ @Marshalling.handles(DO_CONNECT_INIT)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_connect_init(self):
self._testbed.do_connect_init()
- return "%d|%s" % (OK, "")
- def do_connect_compl(self, params):
+ @Marshalling.handles(DO_CONNECT_COMPL)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_connect_compl(self):
self._testbed.do_connect_compl()
- return "%d|%s" % (OK, "")
- def do_configure(self, params):
+ @Marshalling.handles(DO_CONFIGURE)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_configure(self):
self._testbed.do_configure()
- return "%d|%s" % (OK, "")
- def do_preconfigure(self, params):
+ @Marshalling.handles(DO_PRECONFIGURE)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_preconfigure(self):
self._testbed.do_preconfigure()
- return "%d|%s" % (OK, "")
-
- def do_cross_connect_init(self, params):
- pcross_data = base64.b64decode(params[1])
- cross_data = cPickle.loads(pcross_data)
- self._testbed.do_cross_connect_init(cross_data)
- return "%d|%s" % (OK, "")
-
- def do_cross_connect_compl(self, params):
- pcross_data = base64.b64decode(params[1])
- cross_data = cPickle.loads(pcross_data)
- self._testbed.do_cross_connect_compl(cross_data)
- return "%d|%s" % (OK, "")
-
- def get(self, params):
- time = params[1]
- guid = int(param[2])
- name = base64.b64decode(params[3])
- value = self._testbed.get(time, guid, name)
- result = base64.b64encode(str(value))
- return "%d|%s" % (OK, result)
-
- def set(self, params):
- time = params[1]
- guid = int(params[2])
- name = base64.b64decode(params[3])
- value = base64.b64decode(params[4])
- type = int(params[3])
- value = set_type(type, value)
- self._testbed.set(time, guid, name, value)
- return "%d|%s" % (OK, "")
-
- def get_address(self, params):
- guid = int(param[1])
- index = int(param[2])
- attribute = base64.b64decode(param[3])
- value = self._testbed.get_address(guid, index, attribute)
- result = base64.b64encode(str(value))
- return "%d|%s" % (OK, result)
-
- def get_route(self, params):
- guid = int(param[1])
- index = int(param[2])
- attribute = base64.b64decode(param[3])
- value = self._testbed.get_route(guid, index, attribute)
- result = base64.b64encode(str(value))
- return "%d|%s" % (OK, result)
-
- def action(self, params):
- time = params[1]
- guid = int(params[2])
- command = base64.b64decode(params[3])
- self._testbed.action(time, guid, command)
- return "%d|%s" % (OK, "")
-
- def status(self, params):
- guid = int(params[1])
- status = self._testbed.status(guid)
- result = base64.b64encode(str(status))
- return "%d|%s" % (OK, result)
-
- def get_attribute_list(self, params):
- guid = int(param[1])
- attr_list = self._testbed.get_attribute_list(guid)
- value = cPickle.dumps(attr_list)
- result = base64.b64encode(value)
- return "%d|%s" % (OK, result)
-
-class ExperimentControllerServer(server.Server):
- def __init__(self, root_dir, log_level, experiment_xml):
- super(ExperimentControllerServer, self).__init__(root_dir, log_level)
- self._experiment_xml = experiment_xml
- self._controller = None
-
- def post_daemonize(self):
- from nepi.core.execute import ExperimentController
- self._controller = ExperimentController(self._experiment_xml,
- root_dir = self._root_dir)
-
- def reply_action(self, msg):
- if not msg:
- result = base64.b64encode("Invalid command line")
- reply = "%d|%s" % (ERROR, result)
- else:
- params = msg.split("|")
- instruction = int(params[0])
- log_msg(self, params)
- try:
- if instruction == XML:
- reply = self.experiment_xml(params)
- elif instruction == ACCESS:
- reply = self.set_access_configuration(params)
- elif instruction == TRACE:
- reply = self.trace(params)
- elif instruction == FINISHED:
- reply = self.is_finished(params)
- elif instruction == START:
- reply = self.start(params)
- elif instruction == STOP:
- reply = self.stop(params)
- elif instruction == RECOVER:
- reply = self.recover(params)
- elif instruction == SHUTDOWN:
- reply = self.shutdown(params)
- else:
- error = "Invalid instruction %s" % instruction
- self.log_error(error)
- result = base64.b64encode(error)
- reply = "%d|%s" % (ERROR, result)
- except:
- error = self.log_error()
- result = base64.b64encode(error)
- reply = "%d|%s" % (ERROR, result)
- log_reply(self, reply)
- return reply
-
- def experiment_xml(self, params):
- xml = self._controller.experiment_xml
- result = base64.b64encode(xml)
- return "%d|%s" % (OK, result)
-
- def set_access_configuration(self, params):
- testbed_guid = int(params[1])
- mode = params[2]
- communication = params[3]
- host = params[4]
- user = params[5]
- port = int(params[6])
- root_dir = params[7]
- use_agent = params[8] == "True"
- log_level = params[9]
- access_config = AccessConfiguration()
- access_config.set_attribute_value("mode", mode)
- access_config.set_attribute_value("communication", communication)
- access_config.set_attribute_value("host", host)
- access_config.set_attribute_value("user", user)
- access_config.set_attribute_value("port", port)
- access_config.set_attribute_value("rootDirectory", root_dir)
- access_config.set_attribute_value("useAgent", use_agent)
- access_config.set_attribute_value("logLevel", log_level)
- self._controller.set_access_configuration(testbed_guid,
- access_config)
- return "%d|%s" % (OK, "")
-
- def trace(self, params):
- testbed_guid = int(params[1])
- guid = int(params[2])
- trace_id = params[3]
- attribute = base64.b64decode(params[4])
- trace = self._controller.trace(testbed_guid, guid, trace_id, attribute)
- result = base64.b64encode(trace)
- return "%d|%s" % (OK, result)
-
- def is_finished(self, params):
- guid = int(params[1])
- status = self._controller.is_finished(guid)
- result = base64.b64encode(str(status))
- return "%d|%s" % (OK, result)
-
- def start(self, params):
- self._controller.start()
- return "%d|%s" % (OK, "")
-
- def stop(self, params):
- self._controller.stop()
- return "%d|%s" % (OK, "")
-
- def recover(self, params):
- self._controller.recover()
- return "%d|%s" % (OK, "")
-
- def shutdown(self, params):
- self._controller.shutdown()
- return "%d|%s" % (OK, "")
-
-class TestbedControllerProxy(object):
- def __init__(self, root_dir, log_level, testbed_id = None,
- testbed_version = None, launch = True, host = None,
- port = None, user = None, agent = None):
- if launch:
- if testbed_id == None or testbed_version == None:
- raise RuntimeError("To launch a TesbedInstance server a \
- testbed_id and testbed_version are required")
- # ssh
- if host != None:
- python_code = "from nepi.util.proxy import \
- TesbedInstanceServer;\
- s = TestbedControllerServer('%s', %d, '%s', '%s');\
- s.run()" % (root_dir, log_level, testbed_id,
- testbed_version)
- proc = server.popen_ssh_subprocess(python_code, host = host,
- port = port, user = user, agent = agent)
- if proc.poll():
- err = proc.stderr.read()
- raise RuntimeError("Server could not be executed: %s" % \
- err)
- else:
- # launch daemon
- s = TestbedControllerServer(root_dir, log_level, testbed_id,
- testbed_version)
- s.run()
-
- # connect client to server
- self._client = server.Client(root_dir, host = host, port = port,
- user = user, agent = agent)
-
- @property
- def guids(self):
- msg = testbed_messages[GUIDS]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return map(int, text.split(","))
-
- def defer_configure(self, name, value):
- msg = testbed_messages[CONFIGURE]
- type = get_type(value)
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- value = base64.b64encode(str(value))
- msg = msg % (name, value, type)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_create(self, guid, factory_id):
- msg = testbed_messages[CREATE]
- msg = msg % (guid, factory_id)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_create_set(self, guid, name, value):
- msg = testbed_messages[CREATE_SET]
- type = get_type(value)
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- value = base64.b64encode(str(value))
- msg = msg % (guid, name, value, type)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_factory_set(self, guid, name, value):
- msg = testbed_messages[FACTORY_SET]
- type = get_type(value)
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- value = base64.b64encode(str(value))
- msg = msg % (guid, name, value, type)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_connect(self, guid1, connector_type_name1, guid2,
- connector_type_name2):
- msg = testbed_messages[CONNECT]
- msg = msg % (guid1, connector_type_name1, guid2,
- connector_type_name2)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- def defer_cross_connect(self, guid, connector_type_name, cross_guid,
- cross_testbed_id, cross_factory_id, cross_connector_type_name):
- msg = testbed_messages[CROSS_CONNECT]
- msg = msg % (guid, connector_type_name, cross_guid,
- cross_testbed_id, cross_factory_id, cross_connector_type_name)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_add_trace(self, guid, trace_id):
- msg = testbed_messages[ADD_TRACE]
- msg = msg % (guid, trace_id)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_add_address(self, guid, address, netprefix, broadcast):
- msg = testbed_messages[ADD_ADDRESS]
- msg = msg % (guid, address, netprefix, broadcast)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def defer_add_route(self, guid, destination, netprefix, nexthop):
- msg = testbed_messages[ADD_ROUTE]
- msg = msg % (guid, destination, netprefix, nexthop)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_setup(self):
- msg = testbed_messages[DO_SETUP]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_create(self):
- msg = testbed_messages[DO_CREATE]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_connect_init(self):
- msg = testbed_messages[DO_CONNECT_INIT]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_connect_compl(self):
- msg = testbed_messages[DO_CONNECT_COMPL]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_configure(self):
- msg = testbed_messages[DO_CONFIGURE]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def do_preconfigure(self):
- msg = testbed_messages[DO_PRECONFIGURE]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
+ @Marshalling.handles(DO_PRESTART)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_prestart(self):
+ self._testbed.do_prestart()
+ @Marshalling.handles(DO_CROSS_CONNECT_INIT)
+ @Marshalling.args( Marshalling.Decoders.pickled_data )
+ @Marshalling.retvoid
def do_cross_connect_init(self, cross_data):
- msg = testbed_messages[DO_CROSS_CONNECT_INIT]
- pcross_data = cPickle.dumps(cross_data)
- cross_data = base64.b64encode(pcross_data)
- msg = msg % (cross_data)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
+ self._testbed.do_cross_connect_init(cross_data)
+ @Marshalling.handles(DO_CROSS_CONNECT_COMPL)
+ @Marshalling.args( Marshalling.Decoders.pickled_data )
+ @Marshalling.retvoid
def do_cross_connect_compl(self, cross_data):
- msg = testbed_messages[DO_CROSS_CONNECT_COMPL]
- pcross_data = cPickle.dumps(cross_data)
- cross_data = base64.b64encode(pcross_data)
- msg = msg % (cross_data)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def start(self, time = TIME_NOW):
- msg = testbed_messages[START]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def stop(self, time = TIME_NOW):
- msg = testbed_messages[STOP]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def set(self, time, guid, name, value):
- msg = testbed_messages[SET]
- type = get_type(value)
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- value = base64.b64encode(str(value))
- msg = msg % (time, guid, name, value, type)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def get(self, time, guid, name):
- msg = testbed_messages[GET]
- # avoid having "|" in this parameters
- name = base64.b64encode(name)
- msg = msg % (time, guid, name)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
+ self._testbed.do_cross_connect_compl(cross_data)
+ @Marshalling.handles(GET)
+ @Marshalling.args(int, Marshalling.base64_data, str)
+ @Marshalling.retval( Marshalling.pickled_data )
+ def get(self, guid, name, time):
+ return self._testbed.get(guid, name, time)
+
+ @Marshalling.handles(SET)
+ @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
+ @Marshalling.retvoid
+ def set(self, guid, name, value, time):
+ self._testbed.set(guid, name, value, time)
+
+ @Marshalling.handles(GET_ADDRESS)
+ @Marshalling.args(int, int, Marshalling.base64_data)
+ @Marshalling.retval()
def get_address(self, guid, index, attribute):
- msg = testbed_messages[GET_ADDRESS]
- # avoid having "|" in this parameters
- attribute = base64.b64encode(attribute)
- msg = msg % (guid, index, attribute)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
+ return str(self._testbed.get_address(guid, index, attribute))
+ @Marshalling.handles(GET_ROUTE)
+ @Marshalling.args(int, int, Marshalling.base64_data)
+ @Marshalling.retval()
def get_route(self, guid, index, attribute):
- msg = testbed_messages[GET_ROUTE]
- # avoid having "|" in this parameters
- attribute = base64.b64encode(attribute)
- msg = msg % (guid, index, attribute)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
-
- def action(self, time, guid, action):
- msg = testbed_messages[ACTION]
- msg = msg % (time, guid, action)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
+ return str(self._testbed.get_route(guid, index, attribute))
+
+ @Marshalling.handles(ACTION)
+ @Marshalling.args(str, int, Marshalling.base64_data)
+ @Marshalling.retvoid
+ def action(self, time, guid, command):
+ self._testbed.action(time, guid, command)
+ @Marshalling.handles(STATUS)
+ @Marshalling.args(Marshalling.nullint)
+ @Marshalling.retval(int)
def status(self, guid):
- msg = testbed_messages[STATUS]
- msg = msg % (guid)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return int(text)
-
- def trace(self, guid, trace_id, attribute='value'):
- msg = testbed_messages[TRACE]
- attribute = base64.b64encode(attribute)
- msg = msg % (guid, trace_id, attribute)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
-
- def get_attribute_list(self, guid):
- msg = testbed_messages[GET_ATTRIBUTE_LIST]
- msg = msg % (guid)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- attr_list = cPickle.loads(text)
- return attr_list
+ return self._testbed.status(guid)
+
+ @Marshalling.handles(TESTBED_STATUS)
+ @Marshalling.args()
+ @Marshalling.retval(int)
+ def testbed_status(self):
+ return self._testbed.testbed_status()
+
+ @Marshalling.handles(GET_ATTRIBUTE_LIST)
+ @Marshalling.args(int, Marshalling.nullint, Marshalling.bool)
+ @Marshalling.retval( Marshalling.pickled_data )
+ def get_attribute_list(self, guid, filter_flags = None, exclude = False):
+ return self._testbed.get_attribute_list(guid, filter_flags, exclude)
+
+ @Marshalling.handles(GET_FACTORY_ID)
+ @Marshalling.args(int)
+ @Marshalling.retval()
+ def get_factory_id(self, guid):
+ return self._testbed.get_factory_id(guid)
+
+ @Marshalling.handles(RECOVER)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def recover(self):
+ self._testbed.recover()
- def shutdown(self):
- msg = testbed_messages[SHUTDOWN]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- self._client.send_stop()
- self._client.read_reply() # wait for it
-class ExperimentControllerProxy(object):
- def __init__(self, root_dir, log_level, experiment_xml = None,
- launch = True, host = None, port = None, user = None,
- agent = None):
- if launch:
- # launch server
- if experiment_xml == None:
- raise RuntimeError("To launch a ExperimentControllerServer a \
- xml description of the experiment is required")
- # ssh
- if host != None:
- xml = experiment_xml
- python_code = "from nepi.util.proxy import ExperimentControllerServer;\
- s = ExperimentControllerServer(%r, %r, %r);\
- s.run()" % (root_dir, log_level, xml)
- proc = server.popen_ssh_subprocess(python_code, host = host,
- port = port, user = user, agent = agent)
- if proc.poll():
- err = proc.stderr.read()
- raise RuntimeError("Server could not be executed: %s" % \
- err)
- else:
- # launch daemon
- s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
- s.run()
+class ExperimentControllerServer(BaseServer):
+ def __init__(self, root_dir, log_level, experiment_xml, environment_setup,
+ clean_root):
+ super(ExperimentControllerServer, self).__init__(root_dir, log_level,
+ environment_setup = environment_setup, clean_root = clean_root)
+ self._experiment_xml = experiment_xml
+ self._experiment = None
- # connect client to server
- self._client = server.Client(root_dir, host = host, port = port,
- user = user, agent = agent)
-
- @property
- def experiment_xml(self):
- msg = controller_messages[XML]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text
-
- def set_access_configuration(self, testbed_guid, access_config):
- mode = access_config.get_attribute_value("mode")
- communication = access_config.get_attribute_value("communication")
- host = access_config.get_attribute_value("host")
- user = access_config.get_attribute_value("user")
- port = access_config.get_attribute_value("port")
- root_dir = access_config.get_attribute_value("rootDirectory")
- use_agent = access_config.get_attribute_value("useAgent")
- log_level = access_config.get_attribute_value("logLevel")
- msg = controller_messages[ACCESS]
- msg = msg % (testbed_guid, mode, communication, host, user, port,
- root_dir, use_agent, log_level)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
+ def post_daemonize(self):
+ from nepi.core.execute import ExperimentController
+ self._experiment = ExperimentController(self._experiment_xml,
+ root_dir = self._root_dir)
- def trace(self, testbed_guid, guid, trace_id, attribute='value'):
- msg = controller_messages[TRACE]
- attribute = base64.b64encode(attribute)
- msg = msg % (testbed_guid, guid, trace_id, attribute)
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == OK:
- return text
- raise RuntimeError(text)
+ @Marshalling.handles(GUIDS)
+ @Marshalling.args()
+ @Marshalling.retval( Marshalling.pickled_data )
+ def guids(self):
+ return self._experiment.guids
+
+ @Marshalling.handles(STARTED_TIME)
+ @Marshalling.args()
+ @Marshalling.retval( Marshalling.pickled_data )
+ def started_time(self):
+ return self._experiment.started_time
+
+ @Marshalling.handles(STOPPED_TIME)
+ @Marshalling.args()
+ @Marshalling.retval( Marshalling.pickled_data )
+ def stopped_time(self):
+ return self._experiment.stopped_time
+
+ @Marshalling.handles(XML)
+ @Marshalling.args()
+ @Marshalling.retval()
+ def experiment_design_xml(self):
+ return self._experiment.experiment_design_xml
+
+ @Marshalling.handles(EXEC_XML)
+ @Marshalling.args()
+ @Marshalling.retval()
+ def experiment_execute_xml(self):
+ return self._experiment.experiment_execute_xml
+
+ @Marshalling.handles(TRACE)
+ @Marshalling.args(int, str, Marshalling.base64_data)
+ @Marshalling.retval()
+ def trace(self, guid, trace_id, attribute):
+ return str(self._experiment.trace(guid, trace_id, attribute))
+
+ @Marshalling.handles(TRACES_INFO)
+ @Marshalling.args()
+ @Marshalling.retval( Marshalling.pickled_data )
+ def traces_info(self):
+ return self._experiment.traces_info()
+
+ @Marshalling.handles(FINISHED)
+ @Marshalling.args(int)
+ @Marshalling.retval(Marshalling.bool)
+ def is_finished(self, guid):
+ return self._experiment.is_finished(guid)
+ @Marshalling.handles(STATUS)
+ @Marshalling.args(int)
+ @Marshalling.retval(int)
+ def status(self, guid):
+ return self._experiment.is_finished(guid)
+
+ @Marshalling.handles(GET)
+ @Marshalling.args(int, Marshalling.base64_data, str)
+ @Marshalling.retval( Marshalling.pickled_data )
+ def get(self, guid, name, time):
+ return self._experiment.get(guid, name, time)
+
+ @Marshalling.handles(SET)
+ @Marshalling.args(int, Marshalling.base64_data, Marshalling.pickled_data, str)
+ @Marshalling.retvoid
+ def set(self, guid, name, value, time):
+ self._experiment.set(guid, name, value, time)
+
+ @Marshalling.handles(START)
+ @Marshalling.args()
+ @Marshalling.retvoid
def start(self):
- msg = controller_messages[START]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
+ self._experiment.start()
+ @Marshalling.handles(STOP)
+ @Marshalling.args()
+ @Marshalling.retvoid
def stop(self):
- msg = controller_messages[STOP]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
+ self._experiment.stop()
+ @Marshalling.handles(RECOVER)
+ @Marshalling.args()
+ @Marshalling.retvoid
def recover(self):
- msg = controller_messages[RECOVER]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
-
- def is_finished(self, guid):
- msg = controller_messages[FINISHED]
- msg = msg % guid
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
- if code == ERROR:
- raise RuntimeError(text)
- return text == "True"
+ self._experiment.recover()
+ @Marshalling.handles(SHUTDOWN)
+ @Marshalling.args()
+ @Marshalling.retvoid
def shutdown(self):
- msg = controller_messages[SHUTDOWN]
- self._client.send_msg(msg)
- reply = self._client.read_reply()
- result = reply.split("|")
- code = int(result[0])
- text = base64.b64decode(result[1])
+ self._experiment.shutdown()
+
+ @Marshalling.handles(GET_TESTBED_ID)
+ @Marshalling.args(int)
+ @Marshalling.retval()
+ def get_testbed_id(self, guid):
+ return self._experiment.get_testbed_id(guid)
+
+ @Marshalling.handles(GET_FACTORY_ID)
+ @Marshalling.args(int)
+ @Marshalling.retval()
+ def get_factory_id(self, guid):
+ return self._experiment.get_factory_id(guid)
+
+ @Marshalling.handles(GET_TESTBED_VERSION)
+ @Marshalling.args(int)
+ @Marshalling.retval()
+ def get_testbed_version(self, guid):
+ return self._experiment.get_testbed_version(guid)
+
+class BaseProxy(object):
+ _ServerClass = None
+ _ServerClassModule = "nepi.util.proxy"
+
+ def __init__(self, ctor_args, root_dir,
+ launch = True,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
+ environment_setup = "",
+ clean_root = False):
+ if launch:
+ python_code = (
+ "from %(classmodule)s import %(classname)s;"
+ "s = %(classname)s%(ctor_args)r;"
+ "s.run()"
+ % dict(
+ classname = self._ServerClass.__name__,
+ classmodule = self._ServerClassModule,
+ ctor_args = ctor_args
+ ) )
+ proc = server.popen_python(python_code,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ ident_key = ident_key,
+ sudo = sudo,
+ environment_setup = environment_setup)
+ # Wait for the server to be ready, otherwise nobody
+ # will be able to connect to it
+ err = []
+ helo = "nope"
+ while helo:
+ helo = proc.stderr.readline()
+ if helo == 'SERVER_READY.\n':
+ break
+ err.append(helo)
+ else:
+ raise AssertionError, "Expected 'SERVER_READY.', got: %s" % (''.join(err),)
+ # connect client to server
+ self._client = server.Client(root_dir,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ agent = agent,
+ sudo = sudo,
+ environment_setup = environment_setup)
+
+ @staticmethod
+ def _make_message(argtypes, argencoders, command, methname, classname, *args):
+ if len(argtypes) != len(argencoders):
+ raise ValueError, "Invalid arguments for _make_message: "\
+ "in stub method %s of class %s "\
+ "argtypes and argencoders must match in size" % (
+ methname, classname )
+ if len(argtypes) != len(args):
+ raise ValueError, "Invalid arguments for _make_message: "\
+ "in stub method %s of class %s "\
+ "expected %d arguments, got %d" % (
+ methname, classname,
+ len(argtypes), len(args))
+
+ buf = []
+ for argnum, (typ, (encode, fmt), val) in enumerate(zip(argtypes, argencoders, args)):
+ try:
+ buf.append(fmt % encode(val))
+ except:
+ import traceback
+ raise TypeError, "Argument %d of stub method %s of class %s "\
+ "requires a value of type %s, but got %s - nested error: %s" % (
+ argnum, methname, classname,
+ getattr(typ, '__name__', typ), type(val),
+ traceback.format_exc()
+ )
+
+ return "%d|%s" % (command, '|'.join(buf))
+
+ @staticmethod
+ def _parse_reply(rvtype, methname, classname, reply):
+ if not reply:
+ raise RuntimeError, "Invalid reply: %r "\
+ "for stub method %s of class %s" % (
+ reply,
+ methname,
+ classname)
+
+ try:
+ result = reply.split("|")
+ code = int(result[0])
+ text = result[1]
+ except:
+ import traceback
+ raise TypeError, "Return value of stub method %s of class %s "\
+ "cannot be parsed: must be of type %s, got %r - nested error: %s" % (
+ methname, classname,
+ getattr(rvtype, '__name__', rvtype), reply,
+ traceback.format_exc()
+ )
if code == ERROR:
+ text = base64.b64decode(text)
raise RuntimeError(text)
+ elif code == OK:
+ try:
+ if rvtype is None:
+ return
+ else:
+ return rvtype(text)
+ except:
+ import traceback
+ raise TypeError, "Return value of stub method %s of class %s "\
+ "cannot be parsed: must be of type %s - nested error: %s" % (
+ methname, classname,
+ getattr(rvtype, '__name__', rvtype),
+ traceback.format_exc()
+ )
+ else:
+ raise RuntimeError, "Invalid reply: %r "\
+ "for stub method %s of class %s - unknown code" % (
+ reply,
+ methname,
+ classname)
+
+ @staticmethod
+ def _make_stubs(server_class, template_class):
+ """
+ Returns a dictionary method_name -> method
+ with stub methods.
+
+ Usage:
+
+ class SomeProxy(BaseProxy):
+ ...
+
+ locals().update( BaseProxy._make_stubs(
+ ServerClass,
+ TemplateClass
+ ) )
+
+ ServerClass is the corresponding Server class, as
+ specified in the _ServerClass class method (_make_stubs
+ is static and can't access the method), and TemplateClass
+ is the ultimate implementation class behind the server,
+ from which argument names and defaults are taken, to
+ maintain meaningful interfaces.
+ """
+ rv = {}
+
+ class NONE: pass
+
+ import os.path
+ func_template_path = os.path.join(
+ os.path.dirname(__file__),
+ 'proxy_stub.tpl')
+ func_template_file = open(func_template_path, "r")
+ func_template = func_template_file.read()
+ func_template_file.close()
+
+ for methname in vars(template_class).copy():
+ if methname.endswith('_deferred'):
+ # cannot wrap deferreds...
+ continue
+ dmethname = methname+'_deferred'
+ if hasattr(server_class, methname) and not methname.startswith('_'):
+ template_meth = getattr(template_class, methname)
+ server_meth = getattr(server_class, methname)
+
+ command = getattr(server_meth, '_handles_command', None)
+ argtypes = getattr(server_meth, '_argtypes', None)
+ argencoders = getattr(server_meth, '_argencoders', None)
+ rvtype = getattr(server_meth, '_retval', None)
+ doprop = False
+
+ if hasattr(template_meth, 'fget'):
+ # property getter
+ template_meth = template_meth.fget
+ doprop = True
+
+ if command is not None and argtypes is not None and argencoders is not None:
+ # We have an interface method...
+ code = template_meth.func_code
+ argnames = code.co_varnames[:code.co_argcount]
+ argdefaults = ( (NONE,) * (len(argnames) - len(template_meth.func_defaults or ()))
+ + (template_meth.func_defaults or ()) )
+
+ func_globals = dict(
+ BaseProxy = BaseProxy,
+ argtypes = argtypes,
+ argencoders = argencoders,
+ rvtype = rvtype,
+ functools = functools,
+ )
+ context = dict()
+
+ func_text = func_template % dict(
+ self = argnames[0],
+ args = '%s' % (','.join(argnames[1:])),
+ argdefs = ','.join([
+ argname if argdef is NONE
+ else "%s=%r" % (argname, argdef)
+ for argname, argdef in zip(argnames[1:], argdefaults[1:])
+ ]),
+ command = command,
+ methname = methname,
+ classname = server_class.__name__
+ )
+
+ func_text = compile(
+ func_text,
+ func_template_path,
+ 'exec')
+
+ exec func_text in func_globals, context
+
+ if doprop:
+ rv[methname] = property(context[methname])
+ rv[dmethname] = property(context[dmethname])
+ else:
+ rv[methname] = context[methname]
+ rv[dmethname] = context[dmethname]
+
+ # inject _deferred into core classes
+ if hasattr(template_class, methname) and not hasattr(template_class, dmethname):
+ def freezename(methname, dmethname):
+ def dmeth(self, *p, **kw):
+ return getattr(self, methname)(*p, **kw)
+ dmeth.__name__ = dmethname
+ return dmeth
+ dmeth = freezename(methname, dmethname)
+ setattr(template_class, dmethname, dmeth)
+
+ return rv
+
+class TestbedControllerProxy(BaseProxy):
+
+ _ServerClass = TestbedControllerServer
+
+ def __init__(self, root_dir, log_level,
+ testbed_id = None,
+ testbed_version = None,
+ launch = True,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
+ environment_setup = "",
+ clean_root = False):
+ if launch and (testbed_id == None or testbed_version == None):
+ raise RuntimeError("To launch a TesbedControllerServer a "
+ "testbed_id and testbed_version are required")
+ super(TestbedControllerProxy,self).__init__(
+ ctor_args = (root_dir, log_level, testbed_id, testbed_version,
+ environment_setup, clean_root),
+ root_dir = root_dir,
+ launch = launch,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = ident_key,
+ agent = agent,
+ sudo = sudo,
+ environment_setup = environment_setup)
+
+ locals().update( BaseProxy._make_stubs(
+ server_class = TestbedControllerServer,
+ template_class = nepi.core.execute.TestbedController,
+ ) )
+
+ # Shutdown stops the serverside...
+ def shutdown(self, _stub = shutdown):
+ rv = _stub(self)
+ self._client.send_stop()
+ self._client.read_reply() # wait for it
+ return rv
+
+
+class ExperimentControllerProxy(BaseProxy):
+ _ServerClass = ExperimentControllerServer
+
+ def __init__(self, root_dir, log_level,
+ experiment_xml = None,
+ launch = True,
+ communication = DC.ACCESS_LOCAL,
+ host = None,
+ port = None,
+ user = None,
+ ident_key = None,
+ agent = None,
+ sudo = False,
+ environment_setup = "",
+ clean_root = False):
+ super(ExperimentControllerProxy,self).__init__(
+ ctor_args = (root_dir, log_level, experiment_xml, environment_setup,
+ clean_root),
+ root_dir = root_dir,
+ launch = launch,
+ communication = communication,
+ host = host,
+ port = port,
+ user = user,
+ ident_key = ident_key,
+ agent = agent,
+ sudo = sudo,
+ environment_setup = environment_setup,
+ clean_root = clean_root)
+
+ locals().update( BaseProxy._make_stubs(
+ server_class = ExperimentControllerServer,
+ template_class = nepi.core.execute.ExperimentController,
+ ) )
+
+
+ # Shutdown stops the serverside...
+ def shutdown(self, _stub = shutdown):
+ rv = _stub(self)
self._client.send_stop()
self._client.read_reply() # wait for it
+ return rv