import sys
import re
import threading
+import ConfigParser
+import os
ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
raise NotImplementedError
class ExperimentController(object):
- def __init__(self, experiment_xml):
+ def __init__(self, experiment_xml, root_dir):
self._experiment_xml = experiment_xml
self._testbeds = dict()
self._access_config = dict()
self._netrefs = dict()
+ self._root_dir = root_dir
@property
def experiment_xml(self):
def start(self):
self._create_testbed_instances()
+ # persist testbed connection data, for potential recovery
+ self._persist_testbed_proxies()
+
# perform setup in parallel for all test beds,
# wait for all threads to finish
self._parallel([testbed.do_setup
self._parallel([testbed.start
for testbed in self._testbeds.itervalues()])
+ def _persist_testbed_proxies(self):
+ TRANSIENT = ('Recover',)
+
+ # persist access configuration for all testbeds, so that
+ # recovery mode can reconnect to them if it becomes necessary
+ conf = ConfigParser.RawConfigParser()
+ for testbed_guid, testbed_config in self._access_config.iteritems():
+ testbed_guid = str(testbed_guid)
+ conf.add_section(testbed_guid)
+ for attr in testbed_config.attributes_name:
+ if attr not in TRANSIENT:
+ conf.set(testbed_guid, attr,
+ testbed_config.get_attribute_value(attr))
+
+ f = open(os.path.join(self._root_dir, 'access_config.ini'), 'w')
+ conf.write(f)
+ f.close()
+
+ def _load_testbed_proxies(self):
+ TYPEMAP = {
+ STRING : 'get',
+ INTEGER : 'getint',
+ FLOAT : 'getfloat',
+ BOOLEAN : 'getboolean',
+ }
+
+ conf = ConfigParser.RawConfigParser()
+ conf.read(os.path.join(self._root_dir, 'access_config.ini'))
+ for testbed_guid in conf.sections():
+ testbed_config = proxy.AccessConfiguration()
+ for attr in conf.options(testbed_guid):
+ testbed_config.set_attribute_value(attr,
+ conf.get(testbed_guid, attr) )
+
+ testbed_guid = str(testbed_guid)
+ conf.add_section(testbed_guid)
+ for attr in testbed_config.attributes_name:
+ if attr not in TRANSIENT:
+ getter = getattr(conf, TYPEMAP.get(
+ testbed_config.get_attribute_type(attr),
+ 'get') )
+ testbed_config.set_attribute_value(
+ testbed_guid, attr, getter(attr))
+
+ def _unpersist_testbed_proxies(self):
+ try:
+ os.remove(os.path.join(self._root_dir, 'access_config.ini'))
+ except:
+ # Just print exceptions, this is just cleanup
+ import traceback
+ traceback.print_exc(file=sys.stderr)
+
def stop(self):
for testbed in self._testbeds.values():
testbed.stop()
+ self._unpersist_testbed_proxies()
+
+ def recover(self):
+ # reload perviously persisted testbed access configurations
+ self._load_testbed_proxies()
+
+ # recreate testbed proxies by reconnecting only
+ self._create_testbed_instances(recover=True)
def is_finished(self, guid):
for testbed in self._testbeds.values():
if fail_if_undefined:
raise ValueError, "Unresolvable GUID: %r, in netref: %r" % (ref_guid, expr)
- def _create_testbed_instances(self):
+ def _create_testbed_instances(self, recover = False):
parser = XmlExperimentParser()
data = parser.from_xml_to_data(self._experiment_xml)
element_guids = list()
(testbed_id, testbed_version) = data.get_testbed_data(guid)
access_config = None if guid not in self._access_config else\
self._access_config[guid]
+
+ if recover and access_config is None:
+ # need to create one
+ access_config = self._access_config[guid] = proxy.AccessConfiguration()
+ if access_config is not None:
+ # force recovery mode
+ access_config.set_attribute_value("recover",recover)
+
testbed = proxy.create_testbed_instance(testbed_id,
testbed_version, access_config)
for (name, value) in data.get_attribute_data(guid):
# (which could require high-latency network I/O)
(testbed_guid, factory_id) = data.get_box_data(guid)
netrefs.setdefault((testbed_guid,guid),set()).add(name)
- self._program_testbed_instances(element_guids, data)
+ if not recover:
+ self._program_testbed_instances(element_guids, data)
def _program_testbed_instances(self, element_guids, data):
for guid in element_guids:
import getpass
import sys
import time
+import tempfile
+import shutil
# PROTOCOL REPLIES
OK = 0
GUIDS = 27
GET_ROUTE = 28
GET_ADDRESS = 29
+RECOVER = 30
# PARAMETER TYPE
STRING = 100
FINISHED: "%d|%s" % (FINISHED, "%d"),
START: "%d" % START,
STOP: "%d" % STOP,
+ RECOVER : "%d" % RECOVER,
SHUTDOWN: "%d" % SHUTDOWN,
})
FINISHED: "FINISHED",
START: "START",
STOP: "STOP",
+ RECOVER: "RECOVER",
SHUTDOWN: "SHUTDOWN",
CONFIGURE: "CONFIGURE",
CREATE: "CREATE",
allowed = [AccessConfiguration.ERROR_LEVEL,
AccessConfiguration.DEBUG_LEVEL],
validation_function = validation.is_enum)
+ self.add_attribute(name = "recover",
+ help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
+ type = Attribute.BOOL,
+ value = False,
+ validation_function = validation.is_bool)
+
+class TempDir(object):
+ def __init__(self):
+ self.path = tempfile.mkdtemp()
+
+ def __del__(self):
+ shutil.rmtree(self.path)
+
+class PermDir(object):
+ def __init__(self, path):
+ self.path = path
def create_controller(xml, access_config = None):
- mode = None if not access_config else \
- access_config.get_attribute_value("mode")
+ mode = None if not access_config \
+ else access_config.get_attribute_value("mode")
+ launch = True if not access_config \
+ else not access_config.get_attribute_value("recover")
if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
+ if not launch:
+ raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
+
from nepi.core.execute import ExperimentController
- return ExperimentController(xml)
+
+ if not access_config or not access_config.has_attribute("rootDirectory"):
+ root_dir = TempDir()
+ else:
+ root_dir = PermDir(access_config.get_attribute_value("rootDirectory"))
+ controller = ExperimentController(xml, root_dir.path)
+
+ # inject reference to temporary dir, so that it gets cleaned
+ # up at destruction time.
+ controller._tempdir = root_dir
+
+ return controller
elif mode == AccessConfiguration.MODE_DAEMON:
(root_dir, log_level, user, host, port, agent) = \
get_access_config_params(access_config)
return ExperimentControllerProxy(root_dir, log_level,
experiment_xml = xml, host = host, port = port, user = user,
- agent = agent)
- raise RuntimeError("Unsupported access configuration 'mode'" % mode)
+ agent = agent, launch = launch)
+ raise RuntimeError("Unsupported access configuration '%s'" % mode)
def create_testbed_instance(testbed_id, testbed_version, access_config):
- mode = None if not access_config else access_config.get_attribute_value("mode")
+ mode = None if not access_config \
+ else access_config.get_attribute_value("mode")
+ launch = True if not access_config \
+ else not access_config.get_attribute_value("recover")
if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
+ if not launch:
+ raise ValueError, "Unsupported instantiation mode: %s with lanch=False" % (mode,)
return _build_testbed_instance(testbed_id, testbed_version)
elif mode == AccessConfiguration.MODE_DAEMON:
(root_dir, log_level, user, host, port, agent) = \
get_access_config_params(access_config)
return TestbedInstanceProxy(root_dir, log_level, testbed_id = testbed_id,
testbed_version = testbed_version, host = host, port = port,
- user = user, agent = agent)
- raise RuntimeError("Unsupported access configuration 'mode'" % mode)
+ user = user, agent = agent, launch = launch)
+ raise RuntimeError("Unsupported access configuration '%s'" % mode)
def _build_testbed_instance(testbed_id, testbed_version):
mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
def post_daemonize(self):
from nepi.core.execute import ExperimentController
- self._controller = ExperimentController(self._experiment_xml)
+ self._controller = ExperimentController(self._experiment_xml,
+ root_dir = self._root_dir)
def reply_action(self, msg):
- params = msg.split("|")
- instruction = int(params[0])
- log_msg(self, params)
- try:
- if instruction == XML:
- reply = self.experiment_xml(params)
- elif instruction == ACCESS:
- reply = self.set_access_configuration(params)
- elif instruction == TRACE:
- reply = self.trace(params)
- elif instruction == FINISHED:
- reply = self.is_finished(params)
- elif instruction == START:
- reply = self.start(params)
- elif instruction == STOP:
- reply = self.stop(params)
- elif instruction == SHUTDOWN:
- reply = self.shutdown(params)
- else:
- error = "Invalid instruction %s" % instruction
- self.log_error(error)
+ if not msg:
+ result = base64.b64encode("Invalid command line")
+ reply = "%d|%s" % (ERROR, result)
+ else:
+ params = msg.split("|")
+ instruction = int(params[0])
+ log_msg(self, params)
+ try:
+ if instruction == XML:
+ reply = self.experiment_xml(params)
+ elif instruction == ACCESS:
+ reply = self.set_access_configuration(params)
+ elif instruction == TRACE:
+ reply = self.trace(params)
+ elif instruction == FINISHED:
+ reply = self.is_finished(params)
+ elif instruction == START:
+ reply = self.start(params)
+ elif instruction == STOP:
+ reply = self.stop(params)
+ elif instruction == RECOVER:
+ reply = self.recover(params)
+ elif instruction == SHUTDOWN:
+ reply = self.shutdown(params)
+ else:
+ error = "Invalid instruction %s" % instruction
+ self.log_error(error)
+ result = base64.b64encode(error)
+ reply = "%d|%s" % (ERROR, result)
+ except:
+ error = self.log_error()
result = base64.b64encode(error)
reply = "%d|%s" % (ERROR, result)
- except:
- error = self.log_error()
- result = base64.b64encode(error)
- reply = "%d|%s" % (ERROR, result)
log_reply(self, reply)
return reply
self._controller.stop()
return "%d|%s" % (OK, "")
+ def recover(self, params):
+ self._controller.recover()
+ return "%d|%s" % (OK, "")
+
def shutdown(self, params):
self._controller.shutdown()
return "%d|%s" % (OK, "")
if code == ERROR:
raise RuntimeError(text)
self._client.send_stop()
+ self._client.read_reply() # wait for it
class ExperimentControllerProxy(object):
def __init__(self, root_dir, log_level, experiment_xml = None,
if code == ERROR:
raise RuntimeError(text)
+ def recover(self):
+ msg = controller_messages[RECOVER]
+ self._client.send_msg(msg)
+ reply = self._client.read_reply()
+ result = reply.split("|")
+ code = int(result[0])
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
+
def is_finished(self, guid):
msg = controller_messages[FINISHED]
msg = msg % guid
if code == ERROR:
raise RuntimeError(text)
self._client.send_stop()
+ self._client.read_reply() # wait for it
sys.modules["nepi.testbeds.mock.metadata_v01"] = mock.metadata_v01
sys.modules["nepi.testbeds.mock"] = mock
self.root_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.root_dir)
def make_test_experiment(self):
exp_desc = ExperimentDescription()
controller.stop()
controller.shutdown()
+ def test_daemonized_all_integration_recovery(self):
+ exp_desc, desc, app, node1, node2, iface1, iface2 = self.make_test_experiment()
+ xml = exp_desc.to_xml()
+ access_config = proxy.AccessConfiguration()
+ access_config.set_attribute_value("mode",
+ proxy.AccessConfiguration.MODE_DAEMON)
+ access_config.set_attribute_value("rootDirectory", self.root_dir)
+ controller = proxy.create_controller(xml, access_config)
+
+ access_config2 = proxy.AccessConfiguration()
+ access_config2.set_attribute_value("mode",
+ proxy.AccessConfiguration.MODE_DAEMON)
+ inst_root_dir = os.path.join(self.root_dir, "instance")
+ os.mkdir(inst_root_dir)
+ access_config2.set_attribute_value("rootDirectory", inst_root_dir)
+ controller.set_access_configuration(desc.guid, access_config2)
+
+ controller.start()
+ while not controller.is_finished(app.guid):
+ time.sleep(0.5)
+ fake_result = controller.trace(desc.guid, app.guid, "fake")
+ comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+
+--- 10.0.0.2 ping statistics ---
+1 packets transmitted, 1 received, 0% packet loss, time 0ms
+"""
+ self.assertTrue(fake_result.startswith(comp_result))
+
+ # controller dies
+ del controller
+
+ # recover
+ access_config.set_attribute_value("recover",True)
+ controller = proxy.create_controller(xml, access_config)
+
+ # test recovery
+ self.assertTrue(controller.is_finished(app.guid))
+ fake_result = controller.trace(desc.guid, app.guid, "fake")
+ self.assertTrue(fake_result.startswith(comp_result))
+
+ controller.stop()
+ controller.shutdown()
+
def test_reference_expressions(self):
exp_desc, desc, app, node1, node2, iface1, iface2 = self.make_test_experiment()
controller.stop()
controller.shutdown()
- def tearDown(self):
- shutil.rmtree(self.root_dir)
-
if __name__ == '__main__':
unittest.main()