import functools
import time
import logging
+logging.basicConfig()
ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
self._failed_testbeds = set()
self._started_time = None
self._stopped_time = None
-
+ self._testbed_order = []
+
self._logger = logging.getLogger('nepi.core.execute')
-
+ level = logging.ERROR
+ if os.environ.get("NEPI_CONTROLLER_LOGLEVEL",
+ DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
+ level = logging.DEBUG
+ self._logger.setLevel(level)
+
if experiment_xml is None and root_dir is not None:
# Recover
self.load_experiment_xml()
def _parallel(callables):
excs = []
def wrap(callable):
- @functools.wraps(callable)
def wrapped(*p, **kw):
try:
callable(*p, **kw)
except:
logging.exception("Exception occurred in asynchronous thread:")
excs.append(sys.exc_info())
+ try:
+ wrapped = functools.wraps(callable)(wrapped)
+ except:
+ # functools.partial not wrappable
+ pass
return wrapped
threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
for thread in threads:
def steps_to_configure(self, allowed_guids):
# perform setup in parallel for all test beds,
# wait for all threads to finish
+
+ self._logger.debug("ExperimentController: Starting parallel do_setup")
self._parallel([testbed.do_setup
for guid,testbed in self._testbeds.iteritems()
if guid in allowed_guids])
# perform create-connect in parallel, wait
# (internal connections only)
+ self._logger.debug("ExperimentController: Starting parallel do_create")
self._parallel([testbed.do_create
for guid,testbed in self._testbeds.iteritems()
if guid in allowed_guids])
+ self._logger.debug("ExperimentController: Starting parallel do_connect_init")
self._parallel([testbed.do_connect_init
for guid,testbed in self._testbeds.iteritems()
if guid in allowed_guids])
+ self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
self._parallel([testbed.do_connect_compl
for guid,testbed in self._testbeds.iteritems()
if guid in allowed_guids])
+ self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
self._parallel([testbed.do_preconfigure
for guid,testbed in self._testbeds.iteritems()
if guid in allowed_guids])
self._clear_caches()
+
+ # Store testbed order
+ self._testbed_order.append(allowed_guids)
steps_to_configure(self, to_restart)
if self._netreffed_testbeds:
+ self._logger.debug("ExperimentController: Resolving netreffed testbeds")
# initally resolve netrefs
self.do_netrefs(data, fail_if_undefined=False)
all_restart = [ self._testbeds[guid] for guid in all_restart ]
# final netref step, fail if anything's left unresolved
+ self._logger.debug("ExperimentController: Resolving do_netrefs")
self.do_netrefs(data, fail_if_undefined=False)
# Only now, that netref dependencies have been solve, it is safe to
# program cross_connections
+ self._logger.debug("ExperimentController: Programming testbed cross-connections")
self._program_testbed_cross_connections(data)
# perform do_configure in parallel for al testbeds
# (it's internal configuration for each)
+ self._logger.debug("ExperimentController: Starting parallel do_configure")
self._parallel([testbed.do_configure
for testbed in all_restart])
#time.sleep(60)
# cross-connect (cannot be done in parallel)
+ self._logger.debug("ExperimentController: Starting cross-connect")
for guid, testbed in self._testbeds.iteritems():
cross_data = self._get_cross_data(guid)
testbed.do_cross_connect_init(cross_data)
self._clear_caches()
# Last chance to configure (parallel on all testbeds)
+ self._logger.debug("ExperimentController: Starting parallel do_prestart")
self._parallel([testbed.do_prestart
for testbed in all_restart])
self.persist_execute_xml()
# start experiment (parallel start on all testbeds)
+ self._logger.debug("ExperimentController: Starting parallel do_start")
self._parallel([testbed.start
for testbed in all_restart])
conf.add_section(testbed_guid)
for attr in testbed_config.get_attribute_list():
if attr not in TRANSIENT:
- conf.set(testbed_guid, attr,
- testbed_config.get_attribute_value(attr))
+ value = testbed_config.get_attribute_value(attr)
+ if value is not None:
+ conf.set(testbed_guid, attr, value)
f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
conf.write(f)
getter = getattr(conf, TYPEMAP.get(
testbed_config.get_attribute_type(attr),
'get') )
- testbed_config.set_attribute_value(
- attr, getter(testbed_guid, attr))
+ try:
+ value = getter(testbed_guid, attr)
+ testbed_config.set_attribute_value(attr, value)
+ except ConfigParser.NoOptionError:
+ # Leave default
+ pass
def _unpersist_testbed_proxies(self):
try:
def shutdown(self):
exceptions = list()
- for testbed in self._testbeds.values():
+ ordered_testbeds = set()
+
+ def shutdown_testbed(guid):
try:
+ testbed = self._testbeds[guid]
+ ordered_testbeds.add(guid)
testbed.shutdown()
except:
exceptions.append(sys.exc_info())
+
+ self._logger.debug("ExperimentController: Starting parallel shutdown")
+
+ for testbed_guids in reversed(self._testbed_order):
+ self._parallel([functools.partial(shutdown_testbed, guid)
+ for guid in testbed_guids])
+ remaining_guids = set(self._testbeds) - ordered_testbeds
+ if remaining_guids:
+ self._parallel([functools.partial(shutdown_testbed, guid)
+ for guid in remaining_guids])
+
for exc_info in exceptions:
raise exc_info[0], exc_info[1], exc_info[2]
cross_testbed_guid, cross_testbed_id, cross_factory_id,
cross_connector_type_name)
# save cross data for later
+ self._logger.debug("ExperimentController: adding cross_connection data tbd=%d:guid=%d - tbd=%d:guid=%d" % \
+ (testbed_guid, guid, cross_testbed_guid, cross_guid))
self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
cross_guid)