- (other_testbed_guid, other_factory_id) = data.get_box_data(
- other_guid)
- if testbed_guid == other_testbed_guid:
- testbed.defer_connect(guid, connector_type_name, other_guid,
- other_connector_type_name)
- else:
- testbed.defer_cross_connect(guid, connector_type_name, other_guid,
- other_testbed_id, other_factory_id, other_connector_type_name)
- for trace_id in data.get_trace_data(guid):
- testbed.defer_add_trace(guid, trace_id)
- for (autoconf, address, netprefix, broadcast) in \
- data.get_address_data(guid):
- if address != None:
- testbed.defer_add_address(guid, address, netprefix, broadcast)
- for (destination, netprefix, nexthop) in data.get_route_data(guid):
- testbed.defer_add_route(guid, destination, netprefix, nexthop)
+ testbed = self._testbeds.get(testbed_guid)
+ if testbed is not None:
+ for (connector_type_name, cross_guid, cross_connector_type_name) \
+ in data.get_connection_data(guid):
+ (testbed_guid, factory_id) = data.get_box_data(guid)
+ (cross_testbed_guid, cross_factory_id) = data.get_box_data(
+ cross_guid)
+ if testbed_guid != cross_testbed_guid:
+ cross_testbed = self._testbeds[cross_testbed_guid]
+ cross_testbed_id = cross_testbed.testbed_id
+ testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
+ cross_testbed_guid, cross_testbed_id, cross_factory_id,
+ cross_connector_type_name)
+ # save cross data for later
+ self._logger.debug("ExperimentController: adding cross_connection data tbd=%d:guid=%d - tbd=%d:guid=%d" % \
+ (testbed_guid, guid, cross_testbed_guid, cross_guid))
+ self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
+ cross_guid)
+
+ def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
+ if testbed_guid not in self._cross_data:
+ self._cross_data[testbed_guid] = dict()
+ if cross_testbed_guid not in self._cross_data[testbed_guid]:
+ self._cross_data[testbed_guid][cross_testbed_guid] = set()
+ self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
+
+ def _get_cross_data(self, testbed_guid):
+ cross_data = dict()
+ if not testbed_guid in self._cross_data:
+ return cross_data
+
+ # fetch attribute lists in one batch
+ attribute_lists = dict()
+ for cross_testbed_guid, guid_list in \
+ self._cross_data[testbed_guid].iteritems():
+ cross_testbed = self._testbeds[cross_testbed_guid]
+ for cross_guid in guid_list:
+ attribute_lists[(cross_testbed_guid, cross_guid)] = \
+ cross_testbed.get_attribute_list_deferred(cross_guid)
+
+ # fetch attribute values in another batch
+ for cross_testbed_guid, guid_list in \
+ self._cross_data[testbed_guid].iteritems():
+ cross_data[cross_testbed_guid] = dict()
+ cross_testbed = self._testbeds[cross_testbed_guid]
+ for cross_guid in guid_list:
+ elem_cross_data = dict(
+ _guid = cross_guid,
+ _testbed_guid = cross_testbed_guid,
+ _testbed_id = cross_testbed.testbed_id,
+ _testbed_version = cross_testbed.testbed_version)
+ cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
+ attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
+ for attr_name in attribute_list:
+ attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
+ elem_cross_data[attr_name] = attr_value
+
+ # undefer all values - we'll have to serialize them probably later
+ for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
+ for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
+ for attr_name, attr_value in elem_cross_data.iteritems():
+ elem_cross_data[attr_name] = _undefer(attr_value)
+
+ return cross_data
+
+class ExperimentSuite(object):
+ def __init__(self, experiment_xml, access_config, repetitions = None,
+ duration = None, wait_guids = None):
+ self._experiment_xml = experiment_xml
+ self._access_config = access_config
+ self._controllers = dict()
+ self._access_configs = dict()
+ self._repetitions = 1 if not repetitions else repetitions
+ self._duration = duration
+ self._wait_guids = wait_guids
+ self._current = None
+ self._status = TS.STATUS_ZERO
+ self._thread = None
+
+ def current(self):
+ return self._current
+
+ def status(self):
+ return self._status
+
+ def is_finished(self):
+ return self._status == TS.STATUS_STOPPED
+
+ def get_access_configurations(self):
+ return self._access_configs.values()
+
+ def start(self):
+ self._status = TS.STATUS_STARTED
+ self._thread = threading.Thread(target = self._run_experiment_suite)
+ self._thread.start()
+
+ def shutdown(self):
+ if self._thread:
+ self._thread.join()
+ self._thread = None
+ for controller in self._controllers.values():
+ controller.shutdown()
+
+ def get_current_access_config(self):
+ return self._access_configs[self._current]
+
+ def _run_experiment_suite(self):
+ for i in xrange(1, self._repetitions):
+ self._current = i
+ self._run_one_experiment()
+ self._status = TS.STATUS_STOPPED
+
+ def _run_one_experiment(self):
+ from nepi.util import proxy
+ access_config = proxy.AccessConfiguration()
+ for attr in self._access_config.attributes:
+ if attr.value:
+ access_config.set_attribute_value(attr.name, attr.value)
+ access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ root_dir = "%s_%d" % (
+ access_config.get_attribute_value(DC.ROOT_DIRECTORY),
+ self._current)
+ access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
+ controller = proxy.create_experiment_controller(self._experiment_xml,
+ access_config)
+ self._access_configs[self._current] = access_config
+ self._controllers[self._current] = controller
+ controller.start()
+ started_at = time.time()
+ # wait until all specified guids have finished execution
+ if self._wait_guids:
+ while all(itertools.imap(controller.is_finished, self._wait_guids)):
+ time.sleep(0.5)
+ # wait until the minimum experiment duration time has elapsed
+ if self._duration:
+ while (time.time() - started_at) < self._duration:
+ time.sleep(0.5)
+ controller.stop()