-#!/usr/bin/env python
# -*- coding: utf-8 -*-
from nepi.core.attributes import Attribute, AttributesMap
import os
import collections
import functools
+import time
+import logging
+logging.basicConfig()
-ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
+ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
"""Instructs the addition of an address"""
raise NotImplementedError
- def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
+ def defer_add_route(self, guid, destination, netprefix, nexthop,
+ metric = 0, device = None):
"""Instructs the addition of a route"""
raise NotImplementedError
self._netreffed_testbeds = set()
self._guids_in_testbed_cache = dict()
self._failed_testbeds = set()
-
+ self._started_time = None
+ self._stopped_time = None
+ self._testbed_order = []
+
+ self._logger = logging.getLogger('nepi.core.execute')
+ level = logging.ERROR
+ if os.environ.get("NEPI_CONTROLLER_LOGLEVEL",
+ DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
+ level = logging.DEBUG
+ self._logger.setLevel(level)
+
if experiment_xml is None and root_dir is not None:
# Recover
self.load_experiment_xml()
def experiment_execute_xml(self):
return self._experiment_execute_xml
+ @property
+ def started_time(self):
+ return self._started_time
+
+ @property
+ def stopped_time(self):
+ return self._stopped_time
+
@property
def guids(self):
guids = list()
def _parallel(callables):
excs = []
def wrap(callable):
- @functools.wraps(callable)
def wrapped(*p, **kw):
try:
callable(*p, **kw)
except:
- import traceback
- traceback.print_exc(file=sys.stderr)
+ logging.exception("Exception occurred in asynchronous thread:")
excs.append(sys.exc_info())
+ try:
+ wrapped = functools.wraps(callable)(wrapped)
+ except:
+ # functools.partial not wrappable
+ pass
return wrapped
threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
for thread in threads:
raise eTyp, eVal, eLoc
def start(self):
+ self._started_time = time.time()
self._start()
def _start(self, recover = False):
self._testbeds[guid].do_setup()
self._testbeds[guid].recover()
except:
+ self._logger.exception("During recovery of testbed %s", guid)
+
# Mark failed
self._failed_testbeds.add(guid)
def steps_to_configure(self, allowed_guids):
# perform setup in parallel for all test beds,
# wait for all threads to finish
+
+ self._logger.debug("ExperimentController: Starting parallel do_setup")
self._parallel([testbed.do_setup
for guid,testbed in self._testbeds.iteritems()
if guid in allowed_guids])
# perform create-connect in parallel, wait
# (internal connections only)
+ self._logger.debug("ExperimentController: Starting parallel do_create")
self._parallel([testbed.do_create
for guid,testbed in self._testbeds.iteritems()
if guid in allowed_guids])
+ self._logger.debug("ExperimentController: Starting parallel do_connect_init")
self._parallel([testbed.do_connect_init
for guid,testbed in self._testbeds.iteritems()
if guid in allowed_guids])
+ self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
self._parallel([testbed.do_connect_compl
for guid,testbed in self._testbeds.iteritems()
if guid in allowed_guids])
+ self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
self._parallel([testbed.do_preconfigure
for guid,testbed in self._testbeds.iteritems()
if guid in allowed_guids])
self._clear_caches()
+
+ # Store testbed order
+ self._testbed_order.append(allowed_guids)
steps_to_configure(self, to_restart)
if self._netreffed_testbeds:
+ self._logger.debug("ExperimentController: Resolving netreffed testbeds")
# initally resolve netrefs
self.do_netrefs(data, fail_if_undefined=False)
self._testbeds[guid].do_setup()
self._testbeds[guid].recover()
except:
+ self._logger.exception("During recovery of testbed %s", guid)
+
# Mark failed
self._failed_testbeds.add(guid)
all_restart = [ self._testbeds[guid] for guid in all_restart ]
# final netref step, fail if anything's left unresolved
- self.do_netrefs(data, fail_if_undefined=True)
+ self._logger.debug("ExperimentController: Resolving do_netrefs")
+ self.do_netrefs(data, fail_if_undefined=False)
# Only now, that netref dependencies have been solve, it is safe to
# program cross_connections
+ self._logger.debug("ExperimentController: Programming testbed cross-connections")
self._program_testbed_cross_connections(data)
# perform do_configure in parallel for al testbeds
# (it's internal configuration for each)
+ self._logger.debug("ExperimentController: Starting parallel do_configure")
self._parallel([testbed.do_configure
for testbed in all_restart])
#time.sleep(60)
# cross-connect (cannot be done in parallel)
+ self._logger.debug("ExperimentController: Starting cross-connect")
for guid, testbed in self._testbeds.iteritems():
cross_data = self._get_cross_data(guid)
testbed.do_cross_connect_init(cross_data)
self._clear_caches()
# Last chance to configure (parallel on all testbeds)
+ self._logger.debug("ExperimentController: Starting parallel do_prestart")
self._parallel([testbed.do_prestart
for testbed in all_restart])
+ # final netref step, fail if anything's left unresolved
+ self.do_netrefs(data, fail_if_undefined=True)
+
self._clear_caches()
if not recover:
self.persist_execute_xml()
# start experiment (parallel start on all testbeds)
+ self._logger.debug("ExperimentController: Starting parallel do_start")
self._parallel([testbed.start
for testbed in all_restart])
conf.add_section(testbed_guid)
for attr in testbed_config.get_attribute_list():
if attr not in TRANSIENT:
- conf.set(testbed_guid, attr,
- testbed_config.get_attribute_value(attr))
+ value = testbed_config.get_attribute_value(attr)
+ if value is not None:
+ conf.set(testbed_guid, attr, value)
f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
conf.write(f)
getter = getattr(conf, TYPEMAP.get(
testbed_config.get_attribute_type(attr),
'get') )
- testbed_config.set_attribute_value(
- attr, getter(testbed_guid, attr))
+ try:
+ value = getter(testbed_guid, attr)
+ testbed_config.set_attribute_value(attr, value)
+ except ConfigParser.NoOptionError:
+ # Leave default
+ pass
def _unpersist_testbed_proxies(self):
try:
os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
except:
# Just print exceptions, this is just cleanup
- import traceback
- ######## BUG ##########
- #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
- #traceback.print_exc(file=sys.stderr)
+ self._logger.exception("Loading testbed configuration")
def _update_execute_xml(self):
# For all testbeds,
for testbed in self._testbeds.values():
testbed.stop()
self._unpersist_testbed_proxies()
+ self._stopped_time = time.time()
def recover(self):
# reload perviously persisted testbed access configurations
def shutdown(self):
exceptions = list()
- for testbed in self._testbeds.values():
+ ordered_testbeds = set()
+
+ def shutdown_testbed(guid):
try:
+ testbed = self._testbeds[guid]
+ ordered_testbeds.add(guid)
testbed.shutdown()
except:
exceptions.append(sys.exc_info())
+
+ self._logger.debug("ExperimentController: Starting parallel shutdown")
+
+ for testbed_guids in reversed(self._testbed_order):
+ testbed_guids = set(testbed_guids) - ordered_testbeds
+ self._logger.debug("ExperimentController: Shutting down %r", testbed_guids)
+ self._parallel([functools.partial(shutdown_testbed, guid)
+ for guid in testbed_guids])
+ remaining_guids = set(self._testbeds) - ordered_testbeds
+ if remaining_guids:
+ self._logger.debug("ExperimentController: Shutted down %r", ordered_testbeds)
+ self._logger.debug("ExperimentController: Shutting down %r", remaining_guids)
+ self._parallel([functools.partial(shutdown_testbed, guid)
+ for guid in remaining_guids])
+
for exc_info in exceptions:
raise exc_info[0], exc_info[1], exc_info[2]
testbed.get_route(guid, int(index), name),
'trace' :
lambda testbed, guid, index, name:
- testbed.trace(guid, index, name),
+ testbed.trace(guid, index, attribute = name),
'' :
lambda testbed, guid, index, name:
testbed.get(guid, name),
}
def resolve_netref_value(self, value, failval = None):
- match = ATTRIBUTE_PATTERN_BASE.search(value)
- if match:
- label = match.group("label")
- if label.startswith('GUID-'):
- ref_guid = int(label[5:])
- if ref_guid:
- expr = match.group("expr")
- component = (match.group("component") or "")[1:] # skip the dot
- attribute = match.group("attribute")
-
- # split compound components into component kind and index
- # eg: 'addr[0]' -> ('addr', '0')
- component, component_index = self._netref_component_split(component)
-
- # find object and resolve expression
- for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
- if component not in self._NETREF_COMPONENT_GETTERS:
- raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
- elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
- pass
+ rv = failval
+ while True:
+ for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
+ label = match.group("label")
+ if label.startswith('GUID-'):
+ ref_guid = int(label[5:])
+ if ref_guid:
+ expr = match.group("expr")
+ component = (match.group("component") or "")[1:] # skip the dot
+ attribute = match.group("attribute")
+
+ # split compound components into component kind and index
+ # eg: 'addr[0]' -> ('addr', '0')
+ component, component_index = self._netref_component_split(component)
+
+ # find object and resolve expression
+ for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
+ if component not in self._NETREF_COMPONENT_GETTERS:
+ raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
+ elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
+ pass
+ else:
+ ref_value = self._NETREF_COMPONENT_GETTERS[component](
+ ref_testbed, ref_guid, component_index, attribute)
+ if ref_value:
+ value = rv = value.replace(match.group(), ref_value)
+ break
else:
- ref_value = self._NETREF_COMPONENT_GETTERS[component](
- ref_testbed, ref_guid, component_index, attribute)
- if ref_value:
- return value.replace(match.group(), ref_value)
- # couldn't find value
- return failval
+ # unresolvable netref
+ return failval
+ break
+ else:
+ break
+ return rv
def do_netrefs(self, data, fail_if_undefined = False):
# element netrefs
for guid in data_guids:
for name, value in data.get_attribute_data(guid):
if isinstance(value, basestring):
- match = ATTRIBUTE_PATTERN_BASE.search(value)
- if match:
- label = match.group("label")
- if not label.startswith('GUID-'):
- ref_guid = label_guids.get(label)
- if ref_guid is not None:
- value = ATTRIBUTE_PATTERN_BASE.sub(
- ATTRIBUTE_PATTERN_GUID_SUB % dict(
- guid = 'GUID-%d' % (ref_guid,),
- expr = match.group("expr"),
- label = label),
- value)
- data.set_attribute_data(guid, name, value)
-
- # memorize which guid-attribute pairs require
- # postprocessing, to avoid excessive controller-testbed
- # communication at configuration time
- # (which could require high-latency network I/O)
- if not data.is_testbed_data(guid):
- (testbed_guid, factory_id) = data.get_box_data(guid)
- netrefs[(testbed_guid, guid)].add(name)
- else:
- testbed_netrefs[guid].add(name)
+ while True:
+ for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
+ label = match.group("label")
+ if not label.startswith('GUID-'):
+ ref_guid = label_guids.get(label)
+ if ref_guid is not None:
+ value = value.replace(
+ match.group(),
+ ATTRIBUTE_PATTERN_GUID_SUB % dict(
+ guid = 'GUID-%d' % (ref_guid,),
+ expr = match.group("expr"),
+ label = label)
+ )
+ data.set_attribute_data(guid, name, value)
+
+ # memorize which guid-attribute pairs require
+ # postprocessing, to avoid excessive controller-testbed
+ # communication at configuration time
+ # (which could require high-latency network I/O)
+ if not data.is_testbed_data(guid):
+ (testbed_guid, factory_id) = data.get_box_data(guid)
+ netrefs[(testbed_guid, guid)].add(name)
+ else:
+ testbed_netrefs[guid].add(name)
+
+ break
+ else:
+ break
def _create_testbed_controller(self, guid, data, element_guids, recover):
(testbed_id, testbed_version) = data.get_testbed_data(guid)
testbed.defer_add_address(guid, address, netprefix,
broadcast)
# routes
- for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
- testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
+ for (destination, netprefix, nexthop, metric, device) in \
+ data.get_route_data(guid):
+ testbed.defer_add_route(guid, destination, netprefix, nexthop,
+ metric, device)
# store connections data
for (connector_type_name, other_guid, other_connector_type_name) \
in data.get_connection_data(guid):
cross_testbed_guid, cross_testbed_id, cross_factory_id,
cross_connector_type_name)
# save cross data for later
+ self._logger.debug("ExperimentController: adding cross_connection data tbd=%d:guid=%d - tbd=%d:guid=%d" % \
+ (testbed_guid, guid, cross_testbed_guid, cross_guid))
self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
cross_guid)
elem_cross_data[attr_name] = _undefer(attr_value)
return cross_data
-
+
+class ExperimentSuite(object):
+ def __init__(self, experiment_xml, access_config, repetitions = None,
+ duration = None, wait_guids = None):
+ self._experiment_xml = experiment_xml
+ self._access_config = access_config
+ self._controllers = dict()
+ self._access_configs = dict()
+ self._repetitions = 1 if not repetitions else repetitions
+ self._duration = duration
+ self._wait_guids = wait_guids
+ self._current = None
+ self._status = TS.STATUS_ZERO
+ self._thread = None
+
+ def current(self):
+ return self._current
+
+ def status(self):
+ return self._status
+
+ def is_finished(self):
+ return self._status == TS.STATUS_STOPPED
+
+ def get_access_configurations(self):
+ return self._access_configs.values()
+
+ def start(self):
+ self._status = TS.STATUS_STARTED
+ self._thread = threading.Thread(target = self._run_experiment_suite)
+ self._thread.start()
+
+ def shutdown(self):
+ if self._thread:
+ self._thread.join()
+ self._thread = None
+ for controller in self._controllers.values():
+ controller.shutdown()
+
+ def get_current_access_config(self):
+ return self._access_configs[self._current]
+
+ def _run_experiment_suite(self):
+ for i in xrange(1, self._repetitions):
+ self._current = i
+ self._run_one_experiment()
+ self._status = TS.STATUS_STOPPED
+
+ def _run_one_experiment(self):
+ from nepi.util import proxy
+ access_config = proxy.AccessConfiguration()
+ for attr in self._access_config.attributes:
+ if attr.value:
+ access_config.set_attribute_value(attr.name, attr.value)
+ access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ root_dir = "%s_%d" % (
+ access_config.get_attribute_value(DC.ROOT_DIRECTORY),
+ self._current)
+ access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
+ controller = proxy.create_experiment_controller(self._experiment_xml,
+ access_config)
+ self._access_configs[self._current] = access_config
+ self._controllers[self._current] = controller
+ controller.start()
+ started_at = time.time()
+ # wait until all specified guids have finished execution
+ if self._wait_guids:
+ while all(itertools.imap(controller.is_finished, self._wait_guids)):
+ time.sleep(0.5)
+ # wait until the minimum experiment duration time has elapsed
+ if self._duration:
+ while (time.time() - started_at) < self._duration:
+ time.sleep(0.5)
+ controller.stop()
+