-#!/usr/bin/env python
# -*- coding: utf-8 -*-
from nepi.core.attributes import Attribute, AttributesMap
"""Instructs the addition of an address"""
raise NotImplementedError
- def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
+ def defer_add_route(self, guid, destination, netprefix, nexthop,
+ metric = 0, device = None):
"""Instructs the addition of a route"""
raise NotImplementedError
self._failed_testbeds = set()
self._started_time = None
self._stopped_time = None
+ self._testbed_order = []
self._logger = logging.getLogger('nepi.core.execute')
level = logging.ERROR
def _parallel(callables):
excs = []
def wrap(callable):
- @functools.wraps(callable)
def wrapped(*p, **kw):
try:
callable(*p, **kw)
except:
logging.exception("Exception occurred in asynchronous thread:")
excs.append(sys.exc_info())
+ try:
+ wrapped = functools.wraps(callable)(wrapped)
+ except:
+ # functools.partial not wrappable
+ pass
return wrapped
threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
for thread in threads:
for guid,testbed in self._testbeds.iteritems()
if guid in allowed_guids])
self._clear_caches()
+
+ # Store testbed order
+ self._testbed_order.append(allowed_guids)
steps_to_configure(self, to_restart)
def shutdown(self):
exceptions = list()
- for testbed in self._testbeds.values():
+ ordered_testbeds = set()
+
+ def shutdown_testbed(guid):
try:
+ testbed = self._testbeds[guid]
+ ordered_testbeds.add(guid)
testbed.shutdown()
except:
exceptions.append(sys.exc_info())
+
+ self._logger.debug("ExperimentController: Starting parallel shutdown")
+
+ for testbed_guids in reversed(self._testbed_order):
+ testbed_guids = set(testbed_guids) - ordered_testbeds
+ self._logger.debug("ExperimentController: Shutting down %r", testbed_guids)
+ self._parallel([functools.partial(shutdown_testbed, guid)
+ for guid in testbed_guids])
+ remaining_guids = set(self._testbeds) - ordered_testbeds
+ if remaining_guids:
+ self._logger.debug("ExperimentController: Shutted down %r", ordered_testbeds)
+ self._logger.debug("ExperimentController: Shutting down %r", remaining_guids)
+ self._parallel([functools.partial(shutdown_testbed, guid)
+ for guid in remaining_guids])
+
for exc_info in exceptions:
raise exc_info[0], exc_info[1], exc_info[2]
testbed.defer_add_address(guid, address, netprefix,
broadcast)
# routes
- for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
- testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
+ for (destination, netprefix, nexthop, metric, device) in \
+ data.get_route_data(guid):
+ testbed.defer_add_route(guid, destination, netprefix, nexthop,
+ metric, device)
# store connections data
for (connector_type_name, other_guid, other_connector_type_name) \
in data.get_connection_data(guid):
elem_cross_data[attr_name] = _undefer(attr_value)
return cross_data
- """
+
class ExperimentSuite(object):
- def __init__(self, experiment_xml, access_config, repetitions,
- duration, wait_guids):
+ def __init__(self, experiment_xml, access_config, repetitions = None,
+ duration = None, wait_guids = None):
self._experiment_xml = experiment_xml
self._access_config = access_config
- self._experiments = dict()
- self._repetitions = repetitions
+ self._controllers = dict()
+ self._access_configs = dict()
+ self._repetitions = 1 if not repetitions else repetitions
self._duration = duration
self._wait_guids = wait_guids
self._current = None
self._status = TS.STATUS_ZERO
self._thread = None
+ def current(self):
+ return self._current
+
+ def status(self):
+ return self._status
+
+ def is_finished(self):
+ return self._status == TS.STATUS_STOPPED
+
+ def get_access_configurations(self):
+ return self._access_configs.values()
+
def start(self):
self._status = TS.STATUS_STARTED
self._thread = threading.Thread(target = self._run_experiment_suite)
if self._thread:
self._thread.join()
self._thread = None
+ for controller in self._controllers.values():
+ controller.shutdown()
+
+ def get_current_access_config(self):
+ return self._access_configs[self._current]
def _run_experiment_suite(self):
- for i in xrange[0, self.repetitions]:
+ for i in xrange(1, self._repetitions):
self._current = i
self._run_one_experiment()
+ self._status = TS.STATUS_STOPPED
def _run_one_experiment(self):
+ from nepi.util import proxy
access_config = proxy.AccessConfiguration()
for attr in self._access_config.attributes:
- access_config.set_attribute_value(attr.name, attr.value)
+ if attr.value:
+ access_config.set_attribute_value(attr.name, attr.value)
access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
root_dir = "%s_%d" % (
access_config.get_attribute_value(DC.ROOT_DIRECTORY),
access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
controller = proxy.create_experiment_controller(self._experiment_xml,
access_config)
- self._experiments[self._current] = controller
+ self._access_configs[self._current] = access_config
+ self._controllers[self._current] = controller
controller.start()
started_at = time.time()
# wait until all specified guids have finished execution
if self._wait_guids:
- while all(itertools.imap(controller.is_finished, self._wait_guids):
+ while all(itertools.imap(controller.is_finished, self._wait_guids)):
time.sleep(0.5)
# wait until the minimum experiment duration time has elapsed
if self._duration:
while (time.time() - started_at) < self._duration:
time.sleep(0.5)
controller.stop()
- #download results!!
- controller.shutdown()
- """
-
-
-
-
-
-