#!/usr/bin/env python
# -*- coding: utf-8 -*-
+import base64
from nepi.core.attributes import AttributesMap, Attribute
-from nepi.util import server
-from nepi.util import validation
+from nepi.util import server, validation
+from nepi.util.constants import TIME_NOW
+import getpass
import sys
-
-# PROTOCOL MESSAGES
-XML = "xml"
-ACCESS = "access"
-TRACE = "trace"
-FINISHED = "finished"
-START = "start"
-STOP = "stop"
-SHUTDOWN = "shutdown"
-CONFIGURE = "configure"
-CREATE = "create"
-CREATE_SET = "create_set"
-FACTORY_SET = "factory_set"
-CONNECT = "connect"
-CROSS_CONNECT = "cross_connect"
-ADD_TRACE = "add_trace"
-ADD_ADDRESS = "add_address"
-ADD_ROUTE = "add_route"
-DO_SETUP = "do_setup"
-DO_CREATE = "do_create"
-DO_CONNECT = "do_connect"
-DO_CONFIGURE = "do_configure"
-DO_CROSS_CONNECT = "do_cross_connect"
-GET = "get"
-SET = "set"
-ACTION = "action"
-STATUS = "status"
+import time
+
+# PROTOCOL REPLIES
+OK = 0
+ERROR = 1
+
+# PROTOCOL INSTRUCTION MESSAGES
+XML = 2
+ACCESS = 3
+TRACE = 4
+FINISHED = 5
+START = 6
+STOP = 7
+SHUTDOWN = 8
+CONFIGURE = 9
+CREATE = 10
+CREATE_SET = 11
+FACTORY_SET = 12
+CONNECT = 13
+CROSS_CONNECT = 14
+ADD_TRACE = 15
+ADD_ADDRESS = 16
+ADD_ROUTE = 17
+DO_SETUP = 18
+DO_CREATE = 19
+DO_CONNECT = 20
+DO_CONFIGURE = 21
+DO_CROSS_CONNECT = 22
+GET = 23
+SET = 24
+ACTION = 25
+STATUS = 26
+GUIDS = 27
+
+# PARAMETER TYPE
+STRING = 100
+INTEGER = 101
+BOOL = 102
+FLOAT = 103
# EXPERIMENT CONTROLER PROTOCOL MESSAGES
controller_messages = dict({
- XML: XML,
- ACCESS: "%s|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%b"),
- TRACE: "%s|%s" % (TRACE, "%d|%d|%s"),
- FINISHED: "%s|%s" % (FINISHED, "%d"),
- START: START,
- STOP: STOP,
- SHUTDOWN: SHUTDOWN,
+ XML: "%d" % XML,
+ ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
+ TRACE: "%d|%s" % (TRACE, "%d|%d|%s"),
+ FINISHED: "%d|%s" % (FINISHED, "%d"),
+ START: "%d" % START,
+ STOP: "%d" % STOP,
+ SHUTDOWN: "%d" % SHUTDOWN,
})
# TESTBED INSTANCE PROTOCOL MESSAGES
testbed_messages = dict({
- TRACE: "%s|%s" % (TRACE, "%d|%s"),
- START: "%s|%s" % (START, "%s"),
- STOP: "%s|%s" % (STOP, "%s"),
- SHUTDOWN: SHUTDOWN,
- CONFIGURE: "%s|%s" % (CONFIGURE, "%s|%s"),
- CREATE: "%s|%s" % (CREATE, "%d|%s"),
- CREATE_SET: "%s|%s" % (CREATE_SET, "%d|%s|%s"),
- FACTORY_SET: "%s|%s" % (FACTORY_SET, "%d|%s|%s"),
- CONNECT: "%s|%s" % (CONNECT, "%d|%s|%d|%s"),
- CROSS_CONNECT: "%s|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
- ADD_TRACE: "%s|%s" % (ADD_TRACE, "%d|%s"),
- ADD_ADDRESS: "%s|%s" % (ADD_ADDRESS, "%d|%d|%s|%d|%s"),
- ADD_ROUTE: "%s|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
- DO_SETUP: DO_SETUP,
- DO_CREATE: DO_CREATE,
- DO_CONNECT: DO_CONNECT,
- DO_CONFIGURE: DO_CONFIGURE,
- DO_CROSS_CONNECT: DO_CROSS_CONNECT,
- GET: "%s|%s" % (GET, "%s|%d|%s"),
- SET: "%s|%s" % (SET, "%s|%d|%s|%s"),
- ACTION: "%s|%s" % (ACTION, "%s|%d|%s"),
- STATUS: "%s|%s" % (STATUS, "%d"),
+ TRACE: "%d|%s" % (TRACE, "%d|%s"),
+ START: "%d|%s" % (START, "%s"),
+ STOP: "%d|%s" % (STOP, "%s"),
+ SHUTDOWN: "%d" % SHUTDOWN,
+ CONFIGURE: "%d|%s" % (CONFIGURE, "%s|%s|%d"),
+ CREATE: "%d|%s" % (CREATE, "%d|%s"),
+ CREATE_SET: "%d|%s" % (CREATE_SET, "%d|%s|%s|%d"),
+ FACTORY_SET: "%d|%s" % (FACTORY_SET, "%d|%s|%s|%d"),
+ CONNECT: "%d|%s" % (CONNECT, "%d|%s|%d|%s"),
+ CROSS_CONNECT: "%d|%s" % (CROSS_CONNECT, "%d|%s|%d|%d|%s|%s"),
+ ADD_TRACE: "%d|%s" % (ADD_TRACE, "%d|%s"),
+ ADD_ADDRESS: "%d|%s" % (ADD_ADDRESS, "%d|%d|%s|%d|%s"),
+ ADD_ROUTE: "%d|%s" % (ADD_ROUTE, "%d|%s|%d|%s"),
+ DO_SETUP: "%d" % DO_SETUP,
+ DO_CREATE: "%d" % DO_CREATE,
+ DO_CONNECT: "%d" % DO_CONNECT,
+ DO_CONFIGURE: "%d" % DO_CONFIGURE,
+ DO_CROSS_CONNECT: "%d" % DO_CROSS_CONNECT,
+ GET: "%d|%s" % (GET, "%s|%d|%s"),
+ SET: "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
+ ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
+ STATUS: "%d|%s" % (STATUS, "%d"),
+ GUIDS: "%d" % GUIDS,
+ })
+
+instruction_text = dict({
+ OK: "OK",
+ ERROR: "ERROR",
+ XML: "XML",
+ ACCESS: "ACCESS",
+ TRACE: "TRACE",
+ FINISHED: "FINISHED",
+ START: "START",
+ STOP: "STOP",
+ SHUTDOWN: "SHUTDOWN",
+ CONFIGURE: "CONFIGURE",
+ CREATE: "CREATE",
+ CREATE_SET: "CREATE_SET",
+ FACTORY_SET: "FACTORY_SET",
+ CONNECT: "CONNECT",
+ CROSS_CONNECT: "CROSS_CONNECT",
+ ADD_TRACE: "ADD_TRACE",
+ ADD_ADDRESS: "ADD_ADDRESS",
+ ADD_ROUTE: "ADD_ROUTE",
+ DO_SETUP: "DO_SETUP",
+ DO_CREATE: "DO_CREATE",
+ DO_CONNECT: "DO_CONNECT",
+ DO_CONFIGURE: "DO_CONFIGURE",
+ DO_CROSS_CONNECT: "DO_CROSS_CONNECT",
+ GET: "GET",
+ SET: "SET",
+ ACTION: "ACTION",
+ STATUS: "STATUS",
+ GUIDS: "GUIDS",
+ STRING: "STRING",
+ INTEGER: "INTEGER",
+ BOOL: "BOOL",
+ FLOAT: "FLOAT"
})
+def get_type(value):
+ if isinstance(value, bool):
+ return BOOL
+ elif isinstance(value, int):
+ return INTEGER
+ elif isinstance(value, float):
+ return FLOAT
+ else:
+ return STRING
+
+def set_type(type, value):
+ if type == INTEGER:
+ value = int(value)
+ elif type == FLOAT:
+ value = float(value)
+ elif type == BOOL:
+ value = value == "True"
+ else:
+ value = str(value)
+ return value
+
+def log_msg(server, params):
+ instr = int(params[0])
+ instr_txt = instruction_text[instr]
+ server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
+ instr_txt, ", ".join(map(str, params[1:]))))
+
+def log_reply(server, reply):
+ res = reply.split("|")
+ code = int(res[0])
+ code_txt = instruction_text[code]
+ txt = base64.b64decode(res[1])
+ server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
+ code_txt, txt))
+
+def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
+ # launch daemon
+ proc = server.popen_ssh_subprocess(python_code, host = host,
+ port = port, user = user, agent = agent)
+ #while not proc.poll():
+ # time.sleep(0.5)
+ if proc.poll():
+ err = proc.stderr.read()
+ raise RuntimeError("Client could not be executed: %s" % \
+ err)
+ # create client
+ return server.Client(root_dir, host = host, port = port, user = user,
+ agent = agent)
+
+def to_server_log_level(log_level):
+ return server.DEBUG_LEVEL \
+ if log_level == AccessConfiguration.DEBUG_LEVEL \
+ else server.ERROR_LEVEL
+
+def get_access_config_params(access_config):
+ root_dir = access_config.get_attribute_value("rootDirectory")
+ log_level = access_config.get_attribute_value("logLevel")
+ log_level = to_server_log_level(log_level)
+ user = host = port = agent = None
+ communication = access_config.get_attribute_value("communication")
+ if communication == AccessConfiguration.ACCESS_SSH:
+ user = access_config.get_attribute_value("user")
+ host = access_config.get_attribute_value("host")
+ port = access_config.get_attribute_value("port")
+ agent = access_config.get_attribute_value("useAgent")
+ return (root_dir, log_level, user, host, port, agent)
+
class AccessConfiguration(AttributesMap):
MODE_SINGLE_PROCESS = "SINGLE"
MODE_DAEMON = "DAEMON"
ACCESS_SSH = "SSH"
ACCESS_LOCAL = "LOCAL"
+ ERROR_LEVEL = "Error"
+ DEBUG_LEVEL = "Debug"
def __init__(self):
super(AccessConfiguration, self).__init__()
AccessConfiguration.ACCESS_SSH],
validation_function = validation.is_enum)
self.add_attribute(name = "host",
- help = "Host where the instance will be executed",
+ help = "Host where the testbed will be executed",
type = Attribute.STRING,
value = "localhost",
validation_function = validation.is_string)
self.add_attribute(name = "user",
- help = "User on the Host to execute the instance",
+ help = "User on the Host to execute the testbed",
type = Attribute.STRING,
+ value = getpass.getuser(),
validation_function = validation.is_string)
self.add_attribute(name = "port",
help = "Port on the Host",
type = Attribute.BOOL,
value = False,
validation_function = validation.is_bool)
+ self.add_attribute(name = "logLevel",
+ help = "Log level for instance",
+ type = Attribute.ENUM,
+ value = AccessConfiguration.ERROR_LEVEL,
+ allowed = [AccessConfiguration.ERROR_LEVEL,
+ AccessConfiguration.DEBUG_LEVEL],
+ validation_function = validation.is_enum)
def create_controller(xml, access_config = None):
- from nepi.core.execute import ExperimentController
- mode = None if not access_config else access_config.get_attribute_value("mode")
+ mode = None if not access_config else \
+ access_config.get_attribute_value("mode")
if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
- return ExperimentController(xml)
- elif mode ==AccessConfiguration.MODE_DAEMON:
- root_dir = access_config.get_attribute_value("rootDirectory")
- return ExperimentControllerProxy(xml, root_dir)
+ from nepi.core.execute import ExperimentController
+ return ExperimentController(xml)
+ elif mode == AccessConfiguration.MODE_DAEMON:
+ (root_dir, log_level, user, host, port, agent) = \
+ get_access_config_params(access_config)
+ return ExperimentControllerProxy(root_dir, log_level,
+ experiment_xml = xml, host = host, port = port, user = user,
+ agent = agent)
raise RuntimeError("Unsupported access configuration 'mode'" % mode)
def create_testbed_instance(testbed_id, testbed_version, access_config):
if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
return _build_testbed_instance(testbed_id, testbed_version)
elif mode == AccessConfiguration.MODE_DAEMON:
- root_dir = access_config.get_attribute_value("rootDirectory")
- return TestbedIntanceProxy(testbed_id, testbed_version, root_dir)
+ (root_dir, log_level, user, host, port, agent) = \
+ get_access_config_params(access_config)
+ return TestbedIntanceProxy(root_dir, log_level, testbed_id = testbed_id,
+ testbed_version = testbed_version, host = host, port = port,
+ user = user, agent = agent)
raise RuntimeError("Unsupported access configuration 'mode'" % mode)
def _build_testbed_instance(testbed_id, testbed_version):
module = sys.modules[mod_name]
return module.TestbedInstance(testbed_version)
+class TestbedInstanceServer(server.Server):
+ def __init__(self, root_dir, log_level, testbed_id, testbed_version):
+ super(TestbedInstanceServer, self).__init__(root_dir, log_level)
+ self._testbed_id = testbed_id
+ self._testbed_version = testbed_version
+ self._testbed = None
+
+ def post_daemonize(self):
+ self._testbed = _build_testbed_instance(self._testbed_id,
+ self._testbed_version)
+
+ def reply_action(self, msg):
+ params = msg.split("|")
+ instruction = int(params[0])
+ log_msg(self, params)
+ try:
+ if instruction == TRACE:
+ reply = self.trace(params)
+ elif instruction == START:
+ reply = self.start(params)
+ elif instruction == STOP:
+ reply = self.stop(params)
+ elif instruction == SHUTDOWN:
+ reply = self.shutdown(params)
+ elif instruction == CONFIGURE:
+ reply = self.configure(params)
+ elif instruction == CREATE:
+ reply = self.create(params)
+ elif instruction == CREATE_SET:
+ reply = self.create_set(params)
+ elif instruction == FACTORY_SET:
+ reply = self.factory_set(params)
+ elif instruction == CONNECT:
+ reply = self.connect(params)
+ elif instruction == CROSS_CONNECT:
+ reply = self.cross_connect(params)
+ elif instruction == ADD_TRACE:
+ reply = self.add_trace(params)
+ elif instruction == ADD_ADDRESS:
+ reply = self.add_address(params)
+ elif instruction == ADD_ROUTE:
+ reply = self.add_route(params)
+ elif instruction == DO_SETUP:
+ reply = self.do_setup(params)
+ elif instruction == DO_CREATE:
+ reply = self.do_create(params)
+ elif instruction == DO_CONNECT:
+ reply = self.do_connect(params)
+ elif instruction == DO_CONFIGURE:
+ reply = self.do_configure(params)
+ elif instruction == DO_CROSS_CONNECT:
+ reply = self.do_cross_connect(params)
+ elif instruction == GET:
+ reply = self.get(params)
+ elif instruction == SET:
+ reply = self.set(params)
+ elif instruction == ACTION:
+ reply = self.action(params)
+ elif instruction == STATUS:
+ reply = self.status(params)
+ elif instruction == GUIDS:
+ reply = self.guids(params)
+ else:
+ error = "Invalid instruction %s" % instruction
+ self.log_error(error)
+ result = base64.b64encode(error)
+ reply = "%d|%s" % (ERROR, result)
+ except:
+ error = self.log_error()
+ result = base64.b64encode(error)
+ reply = "%d|%s" % (ERROR, result)
+ log_reply(self, reply)
+ return reply
+
+ def guids(self, params):
+ guids = self._testbed.guids
+ guids = ",".join(map(str, guids))
+ result = base64.b64encode(guids)
+ return "%d|%s" % (OK, result)
+
+ def create(self, params):
+ guid = int(params[1])
+ factory_id = params[2]
+ self._testbed.create(guid, factory_id)
+ return "%d|%s" % (OK, "")
+
+ def trace(self, params):
+ guid = int(params[1])
+ trace_id = params[2]
+ trace = self._testbed.trace(guid, trace_id)
+ result = base64.b64encode(trace)
+ return "%d|%s" % (OK, result)
+
+ def start(self, params):
+ time = params[1]
+ self._testbed.start(time)
+ return "%d|%s" % (OK, "")
+
+ def stop(self, params):
+ time = params[1]
+ self._testbed.stop(time)
+ return "%d|%s" % (OK, "")
+
+ def shutdown(self, params):
+ self._testbed.shutdown()
+ return "%d|%s" % (OK, "")
+
+ def configure(self, params):
+ name = base64.b64decode(params[1])
+ value = base64.b64decode(params[2])
+ type = int(params[3])
+ value = set_type(type, value)
+ self._testbed.configure(name, value)
+ return "%d|%s" % (OK, "")
+
+ def create_set(self, params):
+ guid = int(params[1])
+ name = base64.b64decode(params[2])
+ value = base64.b64decode(params[3])
+ type = int(params[4])
+ value = set_type(type, value)
+ self._testbed.create_set(guid, name, value)
+ return "%d|%s" % (OK, "")
+
+ def factory_set(self, params):
+ name = base64.b64decode(params[1])
+ value = base64.b64decode(params[2])
+ type = int(params[3])
+ value = set_type(type, value)
+ self._testbed.factory_set(name, value)
+ return "%d|%s" % (OK, "")
+
+ def connect(self, params):
+ guid1 = int(params[1])
+ connector_type_name1 = params[2]
+ guid2 = int(params[3])
+ connector_type_name2 = params[4]
+ self._testbed.connect(guid1, connector_type_name1, guid2,
+ connector_type_name2)
+ return "%d|%s" % (OK, "")
+
+ def cross_connect(self, params):
+ guid = int(params[1])
+ connector_type_name = params[2]
+ cross_guid = int(params[3])
+ connector_type_name = params[4]
+ cross_guid = int(params[5])
+ cross_testbed_id = params[6]
+ cross_factory_id = params[7]
+ cross_connector_type_name = params[8]
+ self._testbed.cross_connect(guid, connector_type_name, cross_guid,
+ cross_testbed_id, cross_factory_id, cross_connector_type_name)
+ return "%d|%s" % (OK, "")
+
+ def add_trace(self, params):
+ guid = int(params[1])
+ trace_id = params[2]
+ self._testbed.add_trace(guid, trace_id)
+ return "%d|%s" % (OK, "")
+
+ def add_address(self, params):
+ guid = int(params[1])
+ family = int(params[2])
+ address = params[3]
+ netprefix = int(params[4])
+ broadcast = params[5]
+ self._testbed.add_address(guid, family, address, netprefix,
+ broadcast)
+ return "%d|%s" % (OK, "")
+
+ def add_route(self, params):
+ guid = int(params[1])
+ destination = params[2]
+ netprefix = int(params[3])
+ nexthop = params[4]
+ self._testbed.add_route(guid, destination, netprefix, nexthop)
+ return "%d|%s" % (OK, "")
+
+ def do_setup(self, params):
+ self._testbed.do_setup()
+ return "%d|%s" % (OK, "")
+
+ def do_create(self, params):
+ self._testbed.do_create()
+ return "%d|%s" % (OK, "")
+
+ def do_connect(self, params):
+ self._testbed.do_connect()
+ return "%d|%s" % (OK, "")
+
+ def do_configure(self, params):
+ self._testbed.do_configure()
+ return "%d|%s" % (OK, "")
+
+ def do_cross_connect(self, params):
+ self._testbed.do_cross_connect()
+ return "%d|%s" % (OK, "")
+
+ def get(self, params):
+ time = params[1]
+ guid = int(param[2] )
+ name = base64.b64decode(params[3])
+ value = self._testbed.get(time, guid, name)
+ result = base64.b64encode(str(value))
+ return "%d|%s" % (OK, result)
+
+ def set(self, params):
+ time = params[1]
+ guid = int(params[2])
+ name = base64.b64decode(params[3])
+ value = base64.b64decode(params[4])
+ type = int(params[3])
+ value = set_type(type, value)
+ self._testbed.set(time, guid, name, value)
+ return "%d|%s" % (OK, "")
+
+ def action(self, params):
+ time = params[1]
+ guid = int(params[2])
+ command = base64.b64decode(params[3])
+ self._testbed.action(time, guid, command)
+ return "%d|%s" % (OK, "")
+
+ def status(self, params):
+ guid = int(params[1])
+ status = self._testbed.status(guid)
+ result = base64.b64encode(str(status))
+ return "%d|%s" % (OK, result)
+
+class ExperimentControllerServer(server.Server):
+ def __init__(self, root_dir, log_level, experiment_xml):
+ super(ExperimentControllerServer, self).__init__(root_dir, log_level)
+ self._experiment_xml = experiment_xml
+ self._controller = None
+
+ def post_daemonize(self):
+ from nepi.core.execute import ExperimentController
+ self._controller = ExperimentController(self._experiment_xml)
+
+ def reply_action(self, msg):
+ params = msg.split("|")
+ instruction = int(params[0])
+ log_msg(self, params)
+ try:
+ if instruction == XML:
+ reply = self.experiment_xml(params)
+ elif instruction == ACCESS:
+ reply = self.set_access_configuration(params)
+ elif instruction == TRACE:
+ reply = self.trace(params)
+ elif instruction == FINISHED:
+ reply = self.is_finished(params)
+ elif instruction == START:
+ reply = self.start(params)
+ elif instruction == STOP:
+ reply = self.stop(params)
+ elif instruction == SHUTDOWN:
+ reply = self.shutdown(params)
+ else:
+ error = "Invalid instruction %s" % instruction
+ self.log_error(error)
+ result = base64.b64encode(error)
+ reply = "%d|%s" % (ERROR, result)
+ except:
+ error = self.log_error()
+ result = base64.b64encode(error)
+ reply = "%d|%s" % (ERROR, result)
+ log_reply(self, reply)
+ return reply
+
+ def experiment_xml(self, params):
+ xml = self._controller.experiment_xml
+ result = base64.b64encode(xml)
+ return "%d|%s" % (OK, result)
+
+ def set_access_configuration(self, params):
+ testbed_guid = int(params[1])
+ mode = params[2]
+ communication = params[3]
+ host = params[4]
+ user = params[5]
+ port = int(params[6])
+ root_dir = params[7]
+ use_agent = params[8] == "True"
+ log_level = params[9]
+ access_config = AccessConfiguration()
+ access_config.set_attribute_value("mode", mode)
+ access_config.set_attribute_value("communication", communication)
+ access_config.set_attribute_value("host", host)
+ access_config.set_attribute_value("user", user)
+ access_config.set_attribute_value("port", port)
+ access_config.set_attribute_value("rootDirectory", root_dir)
+ access_config.set_attribute_value("useAgent", use_agent)
+ access_config.set_attribute_value("logLevel", log_level)
+ self._controller.set_access_configuration(testbed_guid,
+ access_config)
+ return "%d|%s" % (OK, "")
+
+ def trace(self, params):
+ testbed_guid = int(params[1])
+ guid = int(params[2])
+ trace_id = params[3]
+ trace = self._controller.trace(testbed_guid, guid, trace_id)
+ result = base64.b64encode(trace)
+ return "%d|%s" % (OK, result)
+
+ def is_finished(self, params):
+ guid = int(params[1])
+ status = self._controller.is_finished(guid)
+ result = base64.b64encode(str(status))
+ return "%d|%s" % (OK, result)
+
+ def start(self, params):
+ self._controller.start()
+ return "%d|%s" % (OK, "")
+
+ def stop(self, params):
+ self._controller.stop()
+ return "%d|%s" % (OK, "")
+
+ def shutdown(self, params):
+ self._controller.shutdown()
+ return "%d|%s" % (OK, "")
+
class TestbedIntanceProxy(object):
- def __init__(self, testbed_id, testbed_version, root_dir):
- # launch daemon
- server = server.TestbedInstanceServer(testbed_id, testbed_version,
- root_dir)
- server.run()
- # create_client
- self._client = server.Client(root_dir)
+ def __init__(self, root_dir, log_level, testbed_id = None,
+ testbed_version = None, launch = True, host = None,
+ port = None, user = None, agent = None):
+ if launch:
+ if testbed_id == None or testbed_version == None:
+ raise RuntimeError("To launch a TesbedInstance server a \
+ testbed_id and testbed_version are required")
+ # ssh
+ if host != None:
+ python_code = "from nepi.util.proxy import \
+ TesbedInstanceServer;\
+ s = TestbedInstanceServer('%s', %d, '%s', '%s');\
+ s.run()" % (root_dir, log_level, testbed_id,
+ testbed_version)
+ self._client = launch_ssh_daemon_client(root_dir, python_code,
+ host, port, user, agent)
+ else:
+ # launch daemon
+ s = TestbedInstanceServer(root_dir, log_level, testbed_id,
+ testbed_version)
+ s.run()
+ # create client
+ self._client = server.Client(root_dir)
+
+ @property
+ def guids(self):
+ msg = testbed_messages[GUIDS]
+ self._client.send_msg(msg)
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
+ return map(int, text.split(","))
def configure(self, name, value):
- msg = testbed_messages(CONFIGURE)
- msg = msg % (name, value)
+ msg = testbed_messages[CONFIGURE]
+ type = get_type(value)
+ # avoid having "|" in this parameters
+ name = base64.b64encode(name)
+ value = base64.b64encode(str(value))
+ msg = msg % (name, value, type)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def create(self, guid, factory_id):
- msg = testbed_messages(CREATE)
+ msg = testbed_messages[CREATE]
msg = msg % (guid, factory_id)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def create_set(self, guid, name, value):
- msg = testbed_messages(CREATE_SET)
- msg = msg % (guid, name, value)
+ msg = testbed_messages[CREATE_SET]
+ type = get_type(value)
+ # avoid having "|" in this parameters
+ name = base64.b64encode(name)
+ value = base64.b64encode(str(value))
+ msg = msg % (guid, name, value, type)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def factory_set(self, guid, name, value):
- msg = testbed_messages(FACTORY_SET)
- msg = msg % (guid, name, value)
+ msg = testbed_messages[FACTORY_SET]
+ type = get_type(value)
+ # avoid having "|" in this parameters
+ name = base64.b64encode(name)
+ value = base64.b64encode(str(value))
+ msg = msg % (guid, name, value, type)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def connect(self, guid1, connector_type_name1, guid2,
connector_type_name2):
- msg = testbed_messages(CONNECT)
+ msg = testbed_messages[CONNECT]
msg = msg % (guid1, connector_type_name1, guid2,
connector_type_name2)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def cross_connect(self, guid, connector_type_name, cross_guid,
cross_testbed_id, cross_factory_id, cross_connector_type_name):
- msg = testbed_messages(CROSS_CONNECT)
+ msg = testbed_messages[CROSS_CONNECT]
msg = msg % (guid, connector_type_name, cross_guid,
cross_testbed_id, cross_factory_id, cross_connector_type_name)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def add_trace(self, guid, trace_id):
- msg = testbed_messages(ADD_TRACE)
+ msg = testbed_messages[ADD_TRACE]
msg = msg % (guid, trace_id)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def add_address(self, guid, family, address, netprefix, broadcast):
- msg = testbed_messages(ADD_ADDRESS)
+ msg = testbed_messages[ADD_ADDRESS]
msg = msg % (guid, family, address, netprefix, broadcast)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def add_route(self, guid, destination, netprefix, nexthop):
- msg = testbed_messages(ADD_ROUTE)
+ msg = testbed_messages[ADD_ROUTE]
msg = msg % (guid, destination, netprefix, nexthop)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def do_setup(self):
- msg = testbed_messages(DO_SETUP)
+ msg = testbed_messages[DO_SETUP]
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def do_create(self):
- msg = testbed_messages(DO_CREATE)
+ msg = testbed_messages[DO_CREATE]
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def do_connect(self):
- msg = testbed_messages(DO_CONNECT)
+ msg = testbed_messages[DO_CONNECT]
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def do_configure(self):
- msg = testbed_messages(DO_CONFIGURE)
+ msg = testbed_messages[DO_CONFIGURE]
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def do_cross_connect(self):
- msg = testbed_messages(DO_CROSS_CONNECT)
+ msg = testbed_messages[DO_CROSS_CONNECT]
self._client.send_msg(msg)
- return self._client.read_reply()
-
- def start(self, time):
- msg = testbed_messages(START)
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
+
+ def start(self, time = TIME_NOW):
+ msg = testbed_messages[START]
msg = msg % (time)
self._client.send_msg(msg)
- return self._client.read_reply()
-
- def stop(self, time):
- msg = testbed_messages(STOP)
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
+
+ def stop(self, time = TIME_NOW):
+ msg = testbed_messages[STOP]
msg = msg % (time)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def set(self, time, guid, name, value):
- msg = testbed_messages(SET)
- msg = msg % (time, guid, name, value)
+ msg = testbed_messages[SET]
+ type = get_type(value)
+ # avoid having "|" in this parameters
+ name = base64.b64encode(name)
+ value = base64.b64encode(str(value))
+ msg = msg % (time, guid, name, value, type)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def get(self, time, guid, name):
- msg = testbed_messages(GET)
+ msg = testbed_messages[GET]
+ # avoid having "|" in this parameters
+ name = base64.b64encode(name)
msg = msg % (time, guid, name)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
+ return text
def action(self, time, guid, action):
- msg = testbed_messages(ACTION)
+ msg = testbed_messages[ACTION]
msg = msg % (time, guid, action)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def status(self, guid):
- msg = testbed_messages(STATUS)
+ msg = testbed_messages[STATUS]
msg = msg % (guid)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
+ return int(text)
def trace(self, guid, trace_id):
- msg = testbed_messages(TRACE)
+ msg = testbed_messages[TRACE]
msg = msg % (guid, trace_id)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
+ return text
def shutdown(self):
- msg = testbed_messages(SHUTDOWN)
+ msg = testbed_messages[SHUTDOWN]
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
+ self._client.send_stop()
class ExperimentControllerProxy(object):
- def __init__(self, root_dir):
- # launch daemon
- server = server.ExperimentControllerServer(root_dir)
- server.run()
- # create_client
- self._client = server.Client(root_dir)
+ def __init__(self, root_dir, log_level, experiment_xml = None,
+ launch = True, host = None, port = None, user = None,
+ agent = None):
+ if launch:
+ # launch server
+ if experiment_xml == None:
+ raise RuntimeError("To launch a ExperimentControllerServer a \
+ xml description of the experiment is required")
+ # ssh
+ if host != None:
+ xml = experiment_xml
+ xml = xml.replace("'", r"\'")
+ xml = xml.replace("\"", r"\'")
+ xml = xml.replace("\n", r"")
+ python_code = "from nepi.util.proxy import ExperimentControllerServer;\
+ s = ExperimentControllerServer('%s', %d, '%s');\
+ s.run()" % (root_dir, log_level, xml)
+ self._client = launch_ssh_daemon_client(root_dir, python_code,
+ host, port, user, agent)
+ else:
+ # launch daemon
+ s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
+ s.run()
+ # create client
+ self._client = server.Client(root_dir)
@property
def experiment_xml(self):
- msg = controller_messages(XML)
+ msg = controller_messages[XML]
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
+ return text
def set_access_configuration(self, testbed_guid, access_config):
mode = access_config.get_attribute_value("mode")
port = access_config.get_attribute_value("port")
root_dir = access_config.get_attribute_value("rootDirectory")
use_agent = access_config.get_attribute_value("useAgent")
- msg = controller_messages(ACCESS)
+ log_level = access_config.get_attribute_value("logLevel")
+ msg = controller_messages[ACCESS]
msg = msg % (testbed_guid, mode, communication, host, user, port,
- root_dir, use_agent)
+ root_dir, use_agent, log_level)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def trace(self, testbed_guid, guid, trace_id):
- msg = controller_messages(TRACE)
+ msg = controller_messages[TRACE]
msg = msg % (testbed_guid, guid, trace_id)
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == OK:
+ return text
+ raise RuntimeError(text)
def start(self):
- msg = controller_messages(START)
+ msg = controller_messages[START]
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def stop(self):
- msg = controller_messages(STOP)
+ msg = controller_messages[STOP]
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
def is_finished(self, guid):
- msg = controller_messages(FINISED)
+ msg = controller_messages[FINISHED]
msg = msg % guid
self._client.send_msg(msg)
- return self._client.read_reply()
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
+ return text == "True"
def shutdown(self):
- msg = controller_messages(SHUTDOWN)
+ msg = controller_messages[SHUTDOWN]
self._client.send_msg(msg)
- return self._client.read_reply()
-
-class TestbedInstanceServer(server.Server):
- def __init__(self, testbed_id, testbed_version, root_dir):
- super(TestbedInstanceServer, self).__init__(root_dir)
- self._testbed_id = testbed_id
- self._testbed_version = testbed_version
- self._instance = None
-
- def post_daemonize(self):
- self._instance = _build_testbed_instance(self._testbed_id,
- self._testbed_version)
-
- def reply_action(self, msg):
- return "Reply to: %s" % msg
-
-class ExperimentControllerServer(server.Server):
- def __init__(self, xml, root_dir):
- super(TestbedInstanceServer, self).__init__(root_dir)
- self._xml = xml
- self._controller = None
-
- def post_daemonize(self):
- self._controller = ExperimentController(self._xml)
-
- def reply_action(self, msg):
- return "Reply to: %s" % msg
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
+ self._client.send_stop()