2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
19 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
20 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
21 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
23 def _undefer(deferred):
24 if hasattr(deferred, '_get'):
25 return deferred._get()
30 class TestbedController(object):
31 def __init__(self, testbed_id, testbed_version):
32 self._testbed_id = testbed_id
33 self._testbed_version = testbed_version
37 return self._testbed_id
40 def testbed_version(self):
41 return self._testbed_version
45 raise NotImplementedError
47 def defer_configure(self, name, value):
48 """Instructs setting a configuartion attribute for the testbed instance"""
49 raise NotImplementedError
51 def defer_create(self, guid, factory_id):
52 """Instructs creation of element """
53 raise NotImplementedError
55 def defer_create_set(self, guid, name, value):
56 """Instructs setting an initial attribute on an element"""
57 raise NotImplementedError
59 def defer_factory_set(self, guid, name, value):
60 """Instructs setting an attribute on a factory"""
61 raise NotImplementedError
63 def defer_connect(self, guid1, connector_type_name1, guid2,
64 connector_type_name2):
65 """Instructs creation of a connection between the given connectors"""
66 raise NotImplementedError
68 def defer_cross_connect(self,
69 guid, connector_type_name,
70 cross_guid, cross_testbed_guid,
71 cross_testbed_id, cross_factory_id,
72 cross_connector_type_name):
74 Instructs creation of a connection between the given connectors
75 of different testbed instances
77 raise NotImplementedError
79 def defer_add_trace(self, guid, trace_id):
80 """Instructs the addition of a trace"""
81 raise NotImplementedError
83 def defer_add_address(self, guid, address, netprefix, broadcast):
84 """Instructs the addition of an address"""
85 raise NotImplementedError
87 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
88 """Instructs the addition of a route"""
89 raise NotImplementedError
92 """After do_setup the testbed initial configuration is done"""
93 raise NotImplementedError
97 After do_create all instructed elements are created and
100 raise NotImplementedError
102 def do_connect_init(self):
104 After do_connect_init all internal connections between testbed elements
107 raise NotImplementedError
109 def do_connect_compl(self):
111 After do_connect all internal connections between testbed elements
114 raise NotImplementedError
116 def do_preconfigure(self):
118 Done just before resolving netrefs, after connection, before cross connections,
119 useful for early stages of configuration, for setting up stuff that might be
120 required for netref resolution.
122 raise NotImplementedError
124 def do_configure(self):
125 """After do_configure elements are configured"""
126 raise NotImplementedError
128 def do_prestart(self):
129 """Before do_start elements are prestart-configured"""
130 raise NotImplementedError
132 def do_cross_connect_init(self, cross_data):
134 After do_cross_connect_init initiation of all external connections
135 between different testbed elements is performed
137 raise NotImplementedError
139 def do_cross_connect_compl(self, cross_data):
141 After do_cross_connect_compl completion of all external connections
142 between different testbed elements is performed
144 raise NotImplementedError
147 raise NotImplementedError
150 raise NotImplementedError
154 On testbed recovery (if recovery is a supported policy), the controller
155 instance will be re-created and the following sequence invoked:
158 defer_X - programming the testbed with persisted execution values
159 (not design values). Execution values (ExecImmutable attributes)
160 should be enough to recreate the testbed's state.
162 <cross-connection methods>
164 Start will not be called, and after cross connection invocations,
165 the testbed is supposed to be fully functional again.
167 raise NotImplementedError
169 def set(self, guid, name, value, time = TIME_NOW):
170 raise NotImplementedError
172 def get(self, guid, name, time = TIME_NOW):
173 raise NotImplementedError
175 def get_route(self, guid, index, attribute):
179 guid: guid of box to query
180 index: number of routing entry to fetch
181 attribute: one of Destination, NextHop, NetPrefix
183 raise NotImplementedError
185 def get_address(self, guid, index, attribute='Address'):
189 guid: guid of box to query
190 index: number of inteface to select
191 attribute: one of Address, NetPrefix, Broadcast
193 raise NotImplementedError
195 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
196 raise NotImplementedError
198 def get_factory_id(self, guid):
199 raise NotImplementedError
201 def action(self, time, guid, action):
202 raise NotImplementedError
204 def status(self, guid):
205 raise NotImplementedError
207 def testbed_status(self):
208 raise NotImplementedError
210 def trace(self, guid, trace_id, attribute='value'):
211 raise NotImplementedError
213 def traces_info(self):
214 """ dictionary of dictionaries:
220 filesize = size in bytes,
224 raise NotImplementedError
227 raise NotImplementedError
229 class ExperimentController(object):
230 def __init__(self, experiment_xml, root_dir):
231 self._experiment_design_xml = experiment_xml
232 self._experiment_execute_xml = None
233 self._testbeds = dict()
234 self._deployment_config = dict()
235 self._netrefs = collections.defaultdict(set)
236 self._testbed_netrefs = collections.defaultdict(set)
237 self._cross_data = dict()
238 self._root_dir = root_dir
239 self._netreffed_testbeds = set()
240 self._guids_in_testbed_cache = dict()
241 self._failed_testbeds = set()
242 self._started_time = None
243 self._stopped_time = None
245 self._logger = logging.getLogger('nepi.core.execute')
247 if experiment_xml is None and root_dir is not None:
249 self.load_experiment_xml()
250 self.load_execute_xml()
252 self.persist_experiment_xml()
255 def experiment_design_xml(self):
256 return self._experiment_design_xml
259 def experiment_execute_xml(self):
260 return self._experiment_execute_xml
263 def started_time(self):
264 return self._started_time
267 def stopped_time(self):
268 return self._stopped_time
273 for testbed_guid in self._testbeds.keys():
274 _guids = self._guids_in_testbed(testbed_guid)
279 def persist_experiment_xml(self):
280 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
281 f = open(xml_path, "w")
282 f.write(self._experiment_design_xml)
285 def persist_execute_xml(self):
286 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
287 f = open(xml_path, "w")
288 f.write(self._experiment_execute_xml)
291 def load_experiment_xml(self):
292 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
293 f = open(xml_path, "r")
294 self._experiment_design_xml = f.read()
297 def load_execute_xml(self):
298 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
299 f = open(xml_path, "r")
300 self._experiment_execute_xml = f.read()
303 def trace(self, guid, trace_id, attribute='value'):
304 testbed = self._testbed_for_guid(guid)
306 return testbed.trace(guid, trace_id, attribute)
307 raise RuntimeError("No element exists with guid %d" % guid)
309 def traces_info(self):
311 for guid, testbed in self._testbeds.iteritems():
312 tinfo = testbed.traces_info()
314 traces_info[guid] = testbed.traces_info()
318 def _parallel(callables):
321 @functools.wraps(callable)
322 def wrapped(*p, **kw):
326 logging.exception("Exception occurred in asynchronous thread:")
327 excs.append(sys.exc_info())
329 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
330 for thread in threads:
332 for thread in threads:
335 eTyp, eVal, eLoc = exc
336 raise eTyp, eVal, eLoc
339 self._started_time = time.time()
342 def _start(self, recover = False):
343 parser = XmlExperimentParser()
346 xml = self._experiment_execute_xml
348 xml = self._experiment_design_xml
349 data = parser.from_xml_to_data(xml)
351 # instantiate testbed controllers
352 to_recover, to_restart = self._init_testbed_controllers(data, recover)
353 all_restart = set(to_restart)
356 # persist testbed connection data, for potential recovery
357 self._persist_testbed_proxies()
359 # recover recoverable controllers
360 for guid in to_recover:
362 self._testbeds[guid].do_setup()
363 self._testbeds[guid].recover()
365 self._logger.exception("During recovery of testbed %s", guid)
368 self._failed_testbeds.add(guid)
370 def steps_to_configure(self, allowed_guids):
371 # perform setup in parallel for all test beds,
372 # wait for all threads to finish
373 self._parallel([testbed.do_setup
374 for guid,testbed in self._testbeds.iteritems()
375 if guid in allowed_guids])
377 # perform create-connect in parallel, wait
378 # (internal connections only)
379 self._parallel([testbed.do_create
380 for guid,testbed in self._testbeds.iteritems()
381 if guid in allowed_guids])
383 self._parallel([testbed.do_connect_init
384 for guid,testbed in self._testbeds.iteritems()
385 if guid in allowed_guids])
387 self._parallel([testbed.do_connect_compl
388 for guid,testbed in self._testbeds.iteritems()
389 if guid in allowed_guids])
391 self._parallel([testbed.do_preconfigure
392 for guid,testbed in self._testbeds.iteritems()
393 if guid in allowed_guids])
396 steps_to_configure(self, to_restart)
398 if self._netreffed_testbeds:
399 # initally resolve netrefs
400 self.do_netrefs(data, fail_if_undefined=False)
402 # rinse and repeat, for netreffed testbeds
403 netreffed_testbeds = set(self._netreffed_testbeds)
405 to_recover, to_restart = self._init_testbed_controllers(data, recover)
406 all_restart.update(to_restart)
409 # persist testbed connection data, for potential recovery
410 self._persist_testbed_proxies()
412 # recover recoverable controllers
413 for guid in to_recover:
415 self._testbeds[guid].do_setup()
416 self._testbeds[guid].recover()
418 self._logger.exception("During recovery of testbed %s", guid)
421 self._failed_testbeds.add(guid)
423 # configure dependant testbeds
424 steps_to_configure(self, to_restart)
426 all_restart = [ self._testbeds[guid] for guid in all_restart ]
428 # final netref step, fail if anything's left unresolved
429 self.do_netrefs(data, fail_if_undefined=False)
431 # Only now, that netref dependencies have been solve, it is safe to
432 # program cross_connections
433 self._program_testbed_cross_connections(data)
435 # perform do_configure in parallel for al testbeds
436 # (it's internal configuration for each)
437 self._parallel([testbed.do_configure
438 for testbed in all_restart])
442 #print >>sys.stderr, "DO IT"
446 # cross-connect (cannot be done in parallel)
447 for guid, testbed in self._testbeds.iteritems():
448 cross_data = self._get_cross_data(guid)
449 testbed.do_cross_connect_init(cross_data)
450 for guid, testbed in self._testbeds.iteritems():
451 cross_data = self._get_cross_data(guid)
452 testbed.do_cross_connect_compl(cross_data)
456 # Last chance to configure (parallel on all testbeds)
457 self._parallel([testbed.do_prestart
458 for testbed in all_restart])
460 # final netref step, fail if anything's left unresolved
461 self.do_netrefs(data, fail_if_undefined=True)
466 # update execution xml with execution-specific values
467 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
468 self._update_execute_xml()
469 self.persist_execute_xml()
471 # start experiment (parallel start on all testbeds)
472 self._parallel([testbed.start
473 for testbed in all_restart])
477 def _clear_caches(self):
478 # Cleaning cache for safety.
479 self._guids_in_testbed_cache = dict()
481 def _persist_testbed_proxies(self):
482 TRANSIENT = (DC.RECOVER,)
484 # persist access configuration for all testbeds, so that
485 # recovery mode can reconnect to them if it becomes necessary
486 conf = ConfigParser.RawConfigParser()
487 for testbed_guid, testbed_config in self._deployment_config.iteritems():
488 testbed_guid = str(testbed_guid)
489 conf.add_section(testbed_guid)
490 for attr in testbed_config.get_attribute_list():
491 if attr not in TRANSIENT:
492 conf.set(testbed_guid, attr,
493 testbed_config.get_attribute_value(attr))
495 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
499 def _load_testbed_proxies(self):
501 Attribute.STRING : 'get',
502 Attribute.BOOL : 'getboolean',
503 Attribute.ENUM : 'get',
504 Attribute.DOUBLE : 'getfloat',
505 Attribute.INTEGER : 'getint',
508 TRANSIENT = (DC.RECOVER,)
510 # deferred import because proxy needs
511 # our class definitions to define proxies
512 import nepi.util.proxy as proxy
514 conf = ConfigParser.RawConfigParser()
515 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
516 for testbed_guid in conf.sections():
517 testbed_config = proxy.AccessConfiguration()
518 testbed_guid = str(testbed_guid)
519 for attr in testbed_config.get_attribute_list():
520 if attr not in TRANSIENT:
521 getter = getattr(conf, TYPEMAP.get(
522 testbed_config.get_attribute_type(attr),
524 testbed_config.set_attribute_value(
525 attr, getter(testbed_guid, attr))
527 def _unpersist_testbed_proxies(self):
529 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
531 # Just print exceptions, this is just cleanup
532 self._logger.exception("Loading testbed configuration")
534 def _update_execute_xml(self):
536 # For all elements in testbed,
537 # - gather immutable execute-readable attribuets lists
539 # Generate new design description from design xml
540 # (Wait for attributes lists - implicit syncpoint)
542 # For all elements in testbed,
543 # - gather all immutable execute-readable attribute
544 # values, asynchronously
545 # (Wait for attribute values - implicit syncpoint)
547 # For all elements in testbed,
548 # - inject non-None values into new design
549 # Generate execute xml from new design
551 attribute_lists = dict(
552 (testbed_guid, collections.defaultdict(dict))
553 for testbed_guid in self._testbeds
556 for testbed_guid, testbed in self._testbeds.iteritems():
557 guids = self._guids_in_testbed(testbed_guid)
559 attribute_lists[testbed_guid][guid] = \
560 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
562 parser = XmlExperimentParser()
563 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
565 attribute_values = dict(
566 (testbed_guid, collections.defaultdict(dict))
567 for testbed_guid in self._testbeds
570 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
571 testbed = self._testbeds[testbed_guid]
572 for guid, attribute_list in testbed_attribute_lists.iteritems():
573 attribute_list = _undefer(attribute_list)
574 attribute_values[testbed_guid][guid] = dict(
575 (attribute, testbed.get_deferred(guid, attribute))
576 for attribute in attribute_list
579 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
580 for guid, attribute_values in testbed_attribute_values.iteritems():
581 for attribute, value in attribute_values.iteritems():
582 value = _undefer(value)
583 if value is not None:
584 execute_data.add_attribute_data(guid, attribute, value)
586 self._experiment_execute_xml = parser.to_xml(data=execute_data)
589 for testbed in self._testbeds.values():
591 self._unpersist_testbed_proxies()
592 self._stopped_time = time.time()
595 # reload perviously persisted testbed access configurations
596 self._failed_testbeds.clear()
597 self._load_testbed_proxies()
599 # re-program testbeds that need recovery
600 self._start(recover = True)
602 def is_finished(self, guid):
603 testbed = self._testbed_for_guid(guid)
605 return testbed.status(guid) == AS.STATUS_FINISHED
606 raise RuntimeError("No element exists with guid %d" % guid)
608 def _testbed_recovery_policy(self, guid, data = None):
610 parser = XmlExperimentParser()
611 data = parser.from_xml_to_data(self._experiment_design_xml)
613 return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
615 def status(self, guid):
616 if guid in self._testbeds:
618 # report testbed status
619 if guid in self._failed_testbeds:
620 return TS.STATUS_FAILED
623 return self._testbeds[guid].status()
625 return TS.STATUS_UNRESPONSIVE
628 testbed = self._testbed_for_guid(guid)
629 if testbed is not None:
630 return testbed.status(guid)
632 return AS.STATUS_UNDETERMINED
634 def set(self, guid, name, value, time = TIME_NOW):
635 testbed = self._testbed_for_guid(guid)
637 testbed.set(guid, name, value, time)
639 raise RuntimeError("No element exists with guid %d" % guid)
641 def get(self, guid, name, time = TIME_NOW):
642 testbed = self._testbed_for_guid(guid)
644 return testbed.get(guid, name, time)
645 raise RuntimeError("No element exists with guid %d" % guid)
647 def get_deferred(self, guid, name, time = TIME_NOW):
648 testbed = self._testbed_for_guid(guid)
650 return testbed.get_deferred(guid, name, time)
651 raise RuntimeError("No element exists with guid %d" % guid)
653 def get_factory_id(self, guid):
654 testbed = self._testbed_for_guid(guid)
656 return testbed.get_factory_id(guid)
657 raise RuntimeError("No element exists with guid %d" % guid)
659 def get_testbed_id(self, guid):
660 testbed = self._testbed_for_guid(guid)
662 return testbed.testbed_id
663 raise RuntimeError("No element exists with guid %d" % guid)
665 def get_testbed_version(self, guid):
666 testbed = self._testbed_for_guid(guid)
668 return testbed.testbed_version
669 raise RuntimeError("No element exists with guid %d" % guid)
673 for testbed in self._testbeds.values():
677 exceptions.append(sys.exc_info())
678 for exc_info in exceptions:
679 raise exc_info[0], exc_info[1], exc_info[2]
681 def _testbed_for_guid(self, guid):
682 for testbed_guid in self._testbeds.keys():
683 if guid in self._guids_in_testbed(testbed_guid):
684 if testbed_guid in self._failed_testbeds:
686 return self._testbeds[testbed_guid]
689 def _guids_in_testbed(self, testbed_guid):
690 if testbed_guid not in self._testbeds:
692 if testbed_guid not in self._guids_in_testbed_cache:
693 self._guids_in_testbed_cache[testbed_guid] = \
694 set(self._testbeds[testbed_guid].guids)
695 return self._guids_in_testbed_cache[testbed_guid]
698 def _netref_component_split(component):
699 match = COMPONENT_PATTERN.match(component)
701 return match.group("kind"), match.group("index")
703 return component, None
705 _NETREF_COMPONENT_GETTERS = {
707 lambda testbed, guid, index, name:
708 testbed.get_address(guid, int(index), name),
710 lambda testbed, guid, index, name:
711 testbed.get_route(guid, int(index), name),
713 lambda testbed, guid, index, name:
714 testbed.trace(guid, index, attribute = name),
716 lambda testbed, guid, index, name:
717 testbed.get(guid, name),
720 def resolve_netref_value(self, value, failval = None):
723 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
724 label = match.group("label")
725 if label.startswith('GUID-'):
726 ref_guid = int(label[5:])
728 expr = match.group("expr")
729 component = (match.group("component") or "")[1:] # skip the dot
730 attribute = match.group("attribute")
732 # split compound components into component kind and index
733 # eg: 'addr[0]' -> ('addr', '0')
734 component, component_index = self._netref_component_split(component)
736 # find object and resolve expression
737 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
738 if component not in self._NETREF_COMPONENT_GETTERS:
739 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
740 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
743 ref_value = self._NETREF_COMPONENT_GETTERS[component](
744 ref_testbed, ref_guid, component_index, attribute)
746 value = rv = value.replace(match.group(), ref_value)
749 # unresolvable netref
756 def do_netrefs(self, data, fail_if_undefined = False):
758 for (testbed_guid, guid), attrs in self._netrefs.items():
759 testbed = self._testbeds.get(testbed_guid)
760 if testbed is not None:
761 for name in set(attrs):
762 value = testbed.get(guid, name)
763 if isinstance(value, basestring):
764 ref_value = self.resolve_netref_value(value)
765 if ref_value is not None:
766 testbed.set(guid, name, ref_value)
768 elif fail_if_undefined:
769 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
771 del self._netrefs[(testbed_guid, guid)]
774 for testbed_guid, attrs in self._testbed_netrefs.items():
775 tb_data = dict(data.get_attribute_data(testbed_guid))
777 for name in set(attrs):
778 value = tb_data.get(name)
779 if isinstance(value, basestring):
780 ref_value = self.resolve_netref_value(value)
781 if ref_value is not None:
782 data.set_attribute_data(testbed_guid, name, ref_value)
784 elif fail_if_undefined:
785 raise ValueError, "Unresolvable netref in: %r" % (value,)
787 del self._testbed_netrefs[testbed_guid]
790 def _init_testbed_controllers(self, data, recover = False):
791 blacklist_testbeds = set(self._testbeds)
792 element_guids = list()
794 data_guids = data.guids
798 # gather label associations
799 for guid in data_guids:
800 if not data.is_testbed_data(guid):
801 (testbed_guid, factory_id) = data.get_box_data(guid)
802 label = data.get_attribute_data(guid, "label")
803 if label is not None:
804 if label in label_guids:
805 raise RuntimeError, "Label %r is not unique" % (label,)
806 label_guids[label] = guid
808 # create testbed controllers
809 for guid in data_guids:
810 if data.is_testbed_data(guid):
811 if guid not in self._testbeds:
813 self._create_testbed_controller(
814 guid, data, element_guids, recover)
817 blacklist_testbeds.add(guid)
822 policy = self._testbed_recovery_policy(guid, data=data)
823 if policy == DC.POLICY_RECOVER:
824 self._create_testbed_controller(
825 guid, data, element_guids, False)
827 elif policy == DC.POLICY_RESTART:
828 self._create_testbed_controller(
829 guid, data, element_guids, False)
833 self._failed_testbeds.add(guid)
837 # queue programmable elements
838 # - that have not been programmed already (blacklist_testbeds)
839 # - including recovered or restarted testbeds
840 # - but those that have no unresolved netrefs
841 for guid in data_guids:
842 if not data.is_testbed_data(guid):
843 (testbed_guid, factory_id) = data.get_box_data(guid)
844 if testbed_guid not in blacklist_testbeds:
845 element_guids.append(guid)
847 # replace references to elements labels for its guid
848 self._resolve_labels(data, data_guids, label_guids)
850 # program testbed controllers
852 self._program_testbed_controllers(element_guids, data)
854 return to_recover, to_restart
856 def _resolve_labels(self, data, data_guids, label_guids):
857 netrefs = self._netrefs
858 testbed_netrefs = self._testbed_netrefs
859 for guid in data_guids:
860 for name, value in data.get_attribute_data(guid):
861 if isinstance(value, basestring):
863 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
864 label = match.group("label")
865 if not label.startswith('GUID-'):
866 ref_guid = label_guids.get(label)
867 if ref_guid is not None:
868 value = value.replace(
870 ATTRIBUTE_PATTERN_GUID_SUB % dict(
871 guid = 'GUID-%d' % (ref_guid,),
872 expr = match.group("expr"),
875 data.set_attribute_data(guid, name, value)
877 # memorize which guid-attribute pairs require
878 # postprocessing, to avoid excessive controller-testbed
879 # communication at configuration time
880 # (which could require high-latency network I/O)
881 if not data.is_testbed_data(guid):
882 (testbed_guid, factory_id) = data.get_box_data(guid)
883 netrefs[(testbed_guid, guid)].add(name)
885 testbed_netrefs[guid].add(name)
891 def _create_testbed_controller(self, guid, data, element_guids, recover):
892 (testbed_id, testbed_version) = data.get_testbed_data(guid)
893 deployment_config = self._deployment_config.get(guid)
895 # deferred import because proxy needs
896 # our class definitions to define proxies
897 import nepi.util.proxy as proxy
899 if deployment_config is None:
901 deployment_config = proxy.AccessConfiguration()
903 for (name, value) in data.get_attribute_data(guid):
904 if value is not None and deployment_config.has_attribute(name):
905 # if any deployment config attribute has a netref, we can't
906 # create this controller yet
907 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
908 # remember to re-issue this one
909 self._netreffed_testbeds.add(guid)
912 # copy deployment config attribute
913 deployment_config.set_attribute_value(name, value)
916 self._deployment_config[guid] = deployment_config
918 if deployment_config is not None:
919 # force recovery mode
920 deployment_config.set_attribute_value("recover",recover)
922 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
924 for (name, value) in data.get_attribute_data(guid):
925 testbed.defer_configure(name, value)
926 self._testbeds[guid] = testbed
927 if guid in self._netreffed_testbeds:
928 self._netreffed_testbeds.remove(guid)
930 def _program_testbed_controllers(self, element_guids, data):
931 def resolve_create_netref(data, guid, name, value):
932 # Try to resolve create-time netrefs, if possible
933 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
935 nuvalue = self.resolve_netref_value(value)
937 # Any trouble means we're not in shape to resolve the netref yet
939 if nuvalue is not None:
940 # Only if we succeed we remove the netref deferral entry
942 data.set_attribute_data(guid, name, value)
943 if (testbed_guid, guid) in self._netrefs:
944 self._netrefs[(testbed_guid, guid)].discard(name)
947 for guid in element_guids:
948 (testbed_guid, factory_id) = data.get_box_data(guid)
949 testbed = self._testbeds.get(testbed_guid)
950 if testbed is not None:
952 testbed.defer_create(guid, factory_id)
954 for (name, value) in data.get_attribute_data(guid):
955 value = resolve_create_netref(data, guid, name, value)
956 testbed.defer_create_set(guid, name, value)
958 for guid in element_guids:
959 (testbed_guid, factory_id) = data.get_box_data(guid)
960 testbed = self._testbeds.get(testbed_guid)
961 if testbed is not None:
963 for trace_id in data.get_trace_data(guid):
964 testbed.defer_add_trace(guid, trace_id)
966 for (address, netprefix, broadcast) in data.get_address_data(guid):
968 testbed.defer_add_address(guid, address, netprefix,
971 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
972 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
973 # store connections data
974 for (connector_type_name, other_guid, other_connector_type_name) \
975 in data.get_connection_data(guid):
976 (other_testbed_guid, other_factory_id) = data.get_box_data(
978 if testbed_guid == other_testbed_guid:
979 # each testbed should take care of enforcing internal
980 # connection simmetry, so each connection is only
981 # added in one direction
982 testbed.defer_connect(guid, connector_type_name,
983 other_guid, other_connector_type_name)
985 def _program_testbed_cross_connections(self, data):
986 data_guids = data.guids
987 for guid in data_guids:
988 if not data.is_testbed_data(guid):
989 (testbed_guid, factory_id) = data.get_box_data(guid)
990 testbed = self._testbeds.get(testbed_guid)
991 if testbed is not None:
992 for (connector_type_name, cross_guid, cross_connector_type_name) \
993 in data.get_connection_data(guid):
994 (testbed_guid, factory_id) = data.get_box_data(guid)
995 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
997 if testbed_guid != cross_testbed_guid:
998 cross_testbed = self._testbeds[cross_testbed_guid]
999 cross_testbed_id = cross_testbed.testbed_id
1000 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
1001 cross_testbed_guid, cross_testbed_id, cross_factory_id,
1002 cross_connector_type_name)
1003 # save cross data for later
1004 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1007 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1008 if testbed_guid not in self._cross_data:
1009 self._cross_data[testbed_guid] = dict()
1010 if cross_testbed_guid not in self._cross_data[testbed_guid]:
1011 self._cross_data[testbed_guid][cross_testbed_guid] = set()
1012 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1014 def _get_cross_data(self, testbed_guid):
1016 if not testbed_guid in self._cross_data:
1019 # fetch attribute lists in one batch
1020 attribute_lists = dict()
1021 for cross_testbed_guid, guid_list in \
1022 self._cross_data[testbed_guid].iteritems():
1023 cross_testbed = self._testbeds[cross_testbed_guid]
1024 for cross_guid in guid_list:
1025 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1026 cross_testbed.get_attribute_list_deferred(cross_guid)
1028 # fetch attribute values in another batch
1029 for cross_testbed_guid, guid_list in \
1030 self._cross_data[testbed_guid].iteritems():
1031 cross_data[cross_testbed_guid] = dict()
1032 cross_testbed = self._testbeds[cross_testbed_guid]
1033 for cross_guid in guid_list:
1034 elem_cross_data = dict(
1036 _testbed_guid = cross_testbed_guid,
1037 _testbed_id = cross_testbed.testbed_id,
1038 _testbed_version = cross_testbed.testbed_version)
1039 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1040 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1041 for attr_name in attribute_list:
1042 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1043 elem_cross_data[attr_name] = attr_value
1045 # undefer all values - we'll have to serialize them probably later
1046 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1047 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1048 for attr_name, attr_value in elem_cross_data.iteritems():
1049 elem_cross_data[attr_name] = _undefer(attr_value)
1053 class ExperimentSuite(object):
1054 def __init__(self, experiment_xml, access_config, repetitions,
1055 duration, wait_guids):
1056 self._experiment_xml = experiment_xml
1057 self._access_config = access_config
1058 self._experiments = dict()
1059 self._repetitions = repetitions
1060 self._duration = duration
1061 self._wait_guids = wait_guids
1062 self._current = None
1063 self._status = TS.STATUS_ZERO
1067 self._status = TS.STATUS_STARTED
1068 self._thread = threading.Thread(target = self._run_experiment_suite)
1069 self._thread.start()
1076 def _run_experiment_suite(self):
1077 for i in xrange[0, self.repetitions]:
1079 self._run_one_experiment()
1081 def _run_one_experiment(self):
1082 access_config = proxy.AccessConfiguration()
1083 for attr in self._access_config.attributes:
1084 access_config.set_attribute_value(attr.name, attr.value)
1085 access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1086 root_dir = "%s_%d" % (
1087 access_config.get_attribute_value(DC.ROOT_DIRECTORY),
1089 access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1090 controller = proxy.create_experiment_controller(self._experiment_xml,
1092 self._experiments[self._current] = controller
1094 started_at = time.time()
1095 # wait until all specified guids have finished execution
1096 if self._wait_guids:
1097 while all(itertools.imap(controller.is_finished, self._wait_guids):
1099 # wait until the minimum experiment duration time has elapsed
1101 while (time.time() - started_at) < self._duration:
1105 controller.shutdown()