2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
19 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
20 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
21 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
23 def _undefer(deferred):
24 if hasattr(deferred, '_get'):
25 return deferred._get()
30 class TestbedController(object):
31 def __init__(self, testbed_id, testbed_version):
32 self._testbed_id = testbed_id
33 self._testbed_version = testbed_version
37 return self._testbed_id
40 def testbed_version(self):
41 return self._testbed_version
45 raise NotImplementedError
47 def defer_configure(self, name, value):
48 """Instructs setting a configuartion attribute for the testbed instance"""
49 raise NotImplementedError
51 def defer_create(self, guid, factory_id):
52 """Instructs creation of element """
53 raise NotImplementedError
55 def defer_create_set(self, guid, name, value):
56 """Instructs setting an initial attribute on an element"""
57 raise NotImplementedError
59 def defer_factory_set(self, guid, name, value):
60 """Instructs setting an attribute on a factory"""
61 raise NotImplementedError
63 def defer_connect(self, guid1, connector_type_name1, guid2,
64 connector_type_name2):
65 """Instructs creation of a connection between the given connectors"""
66 raise NotImplementedError
68 def defer_cross_connect(self,
69 guid, connector_type_name,
70 cross_guid, cross_testbed_guid,
71 cross_testbed_id, cross_factory_id,
72 cross_connector_type_name):
74 Instructs creation of a connection between the given connectors
75 of different testbed instances
77 raise NotImplementedError
79 def defer_add_trace(self, guid, trace_id):
80 """Instructs the addition of a trace"""
81 raise NotImplementedError
83 def defer_add_address(self, guid, address, netprefix, broadcast):
84 """Instructs the addition of an address"""
85 raise NotImplementedError
87 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
88 """Instructs the addition of a route"""
89 raise NotImplementedError
92 """After do_setup the testbed initial configuration is done"""
93 raise NotImplementedError
97 After do_create all instructed elements are created and
100 raise NotImplementedError
102 def do_connect_init(self):
104 After do_connect_init all internal connections between testbed elements
107 raise NotImplementedError
109 def do_connect_compl(self):
111 After do_connect all internal connections between testbed elements
114 raise NotImplementedError
116 def do_preconfigure(self):
118 Done just before resolving netrefs, after connection, before cross connections,
119 useful for early stages of configuration, for setting up stuff that might be
120 required for netref resolution.
122 raise NotImplementedError
124 def do_configure(self):
125 """After do_configure elements are configured"""
126 raise NotImplementedError
128 def do_prestart(self):
129 """Before do_start elements are prestart-configured"""
130 raise NotImplementedError
132 def do_cross_connect_init(self, cross_data):
134 After do_cross_connect_init initiation of all external connections
135 between different testbed elements is performed
137 raise NotImplementedError
139 def do_cross_connect_compl(self, cross_data):
141 After do_cross_connect_compl completion of all external connections
142 between different testbed elements is performed
144 raise NotImplementedError
147 raise NotImplementedError
150 raise NotImplementedError
154 On testbed recovery (if recovery is a supported policy), the controller
155 instance will be re-created and the following sequence invoked:
158 defer_X - programming the testbed with persisted execution values
159 (not design values). Execution values (ExecImmutable attributes)
160 should be enough to recreate the testbed's state.
162 <cross-connection methods>
164 Start will not be called, and after cross connection invocations,
165 the testbed is supposed to be fully functional again.
167 raise NotImplementedError
169 def set(self, guid, name, value, time = TIME_NOW):
170 raise NotImplementedError
172 def get(self, guid, name, time = TIME_NOW):
173 raise NotImplementedError
175 def get_route(self, guid, index, attribute):
179 guid: guid of box to query
180 index: number of routing entry to fetch
181 attribute: one of Destination, NextHop, NetPrefix
183 raise NotImplementedError
185 def get_address(self, guid, index, attribute='Address'):
189 guid: guid of box to query
190 index: number of inteface to select
191 attribute: one of Address, NetPrefix, Broadcast
193 raise NotImplementedError
195 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
196 raise NotImplementedError
198 def get_factory_id(self, guid):
199 raise NotImplementedError
201 def action(self, time, guid, action):
202 raise NotImplementedError
204 def status(self, guid):
205 raise NotImplementedError
207 def testbed_status(self):
208 raise NotImplementedError
210 def trace(self, guid, trace_id, attribute='value'):
211 raise NotImplementedError
213 def traces_info(self):
214 """ dictionary of dictionaries:
220 filesize = size in bytes,
224 raise NotImplementedError
227 raise NotImplementedError
229 class ExperimentController(object):
230 def __init__(self, experiment_xml, root_dir):
231 self._experiment_design_xml = experiment_xml
232 self._experiment_execute_xml = None
233 self._testbeds = dict()
234 self._deployment_config = dict()
235 self._netrefs = collections.defaultdict(set)
236 self._testbed_netrefs = collections.defaultdict(set)
237 self._cross_data = dict()
238 self._root_dir = root_dir
239 self._netreffed_testbeds = set()
240 self._guids_in_testbed_cache = dict()
241 self._failed_testbeds = set()
242 self._started_time = None
243 self._stopped_time = None
245 self._logger = logging.getLogger('nepi.core.execute')
246 level = logging.ERROR
247 if os.environ.get("NEPI_CONTROLLER_LOGLEVEL",
248 DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
249 level = logging.DEBUG
250 self._logger.setLevel(level)
252 if experiment_xml is None and root_dir is not None:
254 self.load_experiment_xml()
255 self.load_execute_xml()
257 self.persist_experiment_xml()
260 def experiment_design_xml(self):
261 return self._experiment_design_xml
264 def experiment_execute_xml(self):
265 return self._experiment_execute_xml
268 def started_time(self):
269 return self._started_time
272 def stopped_time(self):
273 return self._stopped_time
278 for testbed_guid in self._testbeds.keys():
279 _guids = self._guids_in_testbed(testbed_guid)
284 def persist_experiment_xml(self):
285 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
286 f = open(xml_path, "w")
287 f.write(self._experiment_design_xml)
290 def persist_execute_xml(self):
291 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
292 f = open(xml_path, "w")
293 f.write(self._experiment_execute_xml)
296 def load_experiment_xml(self):
297 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
298 f = open(xml_path, "r")
299 self._experiment_design_xml = f.read()
302 def load_execute_xml(self):
303 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
304 f = open(xml_path, "r")
305 self._experiment_execute_xml = f.read()
308 def trace(self, guid, trace_id, attribute='value'):
309 testbed = self._testbed_for_guid(guid)
311 return testbed.trace(guid, trace_id, attribute)
312 raise RuntimeError("No element exists with guid %d" % guid)
314 def traces_info(self):
316 for guid, testbed in self._testbeds.iteritems():
317 tinfo = testbed.traces_info()
319 traces_info[guid] = testbed.traces_info()
323 def _parallel(callables):
326 @functools.wraps(callable)
327 def wrapped(*p, **kw):
331 logging.exception("Exception occurred in asynchronous thread:")
332 excs.append(sys.exc_info())
334 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
335 for thread in threads:
337 for thread in threads:
340 eTyp, eVal, eLoc = exc
341 raise eTyp, eVal, eLoc
344 self._started_time = time.time()
347 def _start(self, recover = False):
348 parser = XmlExperimentParser()
351 xml = self._experiment_execute_xml
353 xml = self._experiment_design_xml
354 data = parser.from_xml_to_data(xml)
356 # instantiate testbed controllers
357 to_recover, to_restart = self._init_testbed_controllers(data, recover)
358 all_restart = set(to_restart)
361 # persist testbed connection data, for potential recovery
362 self._persist_testbed_proxies()
364 # recover recoverable controllers
365 for guid in to_recover:
367 self._testbeds[guid].do_setup()
368 self._testbeds[guid].recover()
370 self._logger.exception("During recovery of testbed %s", guid)
373 self._failed_testbeds.add(guid)
375 def steps_to_configure(self, allowed_guids):
376 # perform setup in parallel for all test beds,
377 # wait for all threads to finish
379 self._logger.debug("ExperimentController: Starting parallel do_setup")
380 self._parallel([testbed.do_setup
381 for guid,testbed in self._testbeds.iteritems()
382 if guid in allowed_guids])
384 # perform create-connect in parallel, wait
385 # (internal connections only)
386 self._logger.debug("ExperimentController: Starting parallel do_create")
387 self._parallel([testbed.do_create
388 for guid,testbed in self._testbeds.iteritems()
389 if guid in allowed_guids])
391 self._logger.debug("ExperimentController: Starting parallel do_connect_init")
392 self._parallel([testbed.do_connect_init
393 for guid,testbed in self._testbeds.iteritems()
394 if guid in allowed_guids])
396 self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
397 self._parallel([testbed.do_connect_compl
398 for guid,testbed in self._testbeds.iteritems()
399 if guid in allowed_guids])
401 self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
402 self._parallel([testbed.do_preconfigure
403 for guid,testbed in self._testbeds.iteritems()
404 if guid in allowed_guids])
407 steps_to_configure(self, to_restart)
409 self._logger.debug("ExperimentController: Resolving netreffed testbeds")
410 if self._netreffed_testbeds:
411 # initally resolve netrefs
412 self.do_netrefs(data, fail_if_undefined=False)
414 # rinse and repeat, for netreffed testbeds
415 netreffed_testbeds = set(self._netreffed_testbeds)
417 to_recover, to_restart = self._init_testbed_controllers(data, recover)
418 all_restart.update(to_restart)
421 # persist testbed connection data, for potential recovery
422 self._persist_testbed_proxies()
424 # recover recoverable controllers
425 for guid in to_recover:
427 self._testbeds[guid].do_setup()
428 self._testbeds[guid].recover()
430 self._logger.exception("During recovery of testbed %s", guid)
433 self._failed_testbeds.add(guid)
435 # configure dependant testbeds
436 steps_to_configure(self, to_restart)
438 all_restart = [ self._testbeds[guid] for guid in all_restart ]
440 # final netref step, fail if anything's left unresolved
441 self._logger.debug("ExperimentController: Resolving do_netrefs")
442 self.do_netrefs(data, fail_if_undefined=False)
444 # Only now, that netref dependencies have been solve, it is safe to
445 # program cross_connections
446 self._program_testbed_cross_connections(data)
448 # perform do_configure in parallel for al testbeds
449 # (it's internal configuration for each)
450 self._logger.debug("ExperimentController: Starting parallel do_configure")
451 self._parallel([testbed.do_configure
452 for testbed in all_restart])
456 #print >>sys.stderr, "DO IT"
460 # cross-connect (cannot be done in parallel)
461 self._logger.debug("ExperimentController: Starting cross-connect")
462 for guid, testbed in self._testbeds.iteritems():
463 cross_data = self._get_cross_data(guid)
464 testbed.do_cross_connect_init(cross_data)
465 for guid, testbed in self._testbeds.iteritems():
466 cross_data = self._get_cross_data(guid)
467 testbed.do_cross_connect_compl(cross_data)
471 # Last chance to configure (parallel on all testbeds)
472 self._logger.debug("ExperimentController: Starting parallel do_prestart")
473 self._parallel([testbed.do_prestart
474 for testbed in all_restart])
476 # final netref step, fail if anything's left unresolved
477 self.do_netrefs(data, fail_if_undefined=True)
482 # update execution xml with execution-specific values
483 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
484 self._update_execute_xml()
485 self.persist_execute_xml()
487 # start experiment (parallel start on all testbeds)
488 self._logger.debug("ExperimentController: Starting parallel do_start")
489 self._parallel([testbed.start
490 for testbed in all_restart])
494 def _clear_caches(self):
495 # Cleaning cache for safety.
496 self._guids_in_testbed_cache = dict()
498 def _persist_testbed_proxies(self):
499 TRANSIENT = (DC.RECOVER,)
501 # persist access configuration for all testbeds, so that
502 # recovery mode can reconnect to them if it becomes necessary
503 conf = ConfigParser.RawConfigParser()
504 for testbed_guid, testbed_config in self._deployment_config.iteritems():
505 testbed_guid = str(testbed_guid)
506 conf.add_section(testbed_guid)
507 for attr in testbed_config.get_attribute_list():
508 if attr not in TRANSIENT:
509 conf.set(testbed_guid, attr,
510 testbed_config.get_attribute_value(attr))
512 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
516 def _load_testbed_proxies(self):
518 Attribute.STRING : 'get',
519 Attribute.BOOL : 'getboolean',
520 Attribute.ENUM : 'get',
521 Attribute.DOUBLE : 'getfloat',
522 Attribute.INTEGER : 'getint',
525 TRANSIENT = (DC.RECOVER,)
527 # deferred import because proxy needs
528 # our class definitions to define proxies
529 import nepi.util.proxy as proxy
531 conf = ConfigParser.RawConfigParser()
532 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
533 for testbed_guid in conf.sections():
534 testbed_config = proxy.AccessConfiguration()
535 testbed_guid = str(testbed_guid)
536 for attr in testbed_config.get_attribute_list():
537 if attr not in TRANSIENT:
538 getter = getattr(conf, TYPEMAP.get(
539 testbed_config.get_attribute_type(attr),
541 testbed_config.set_attribute_value(
542 attr, getter(testbed_guid, attr))
544 def _unpersist_testbed_proxies(self):
546 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
548 # Just print exceptions, this is just cleanup
549 self._logger.exception("Loading testbed configuration")
551 def _update_execute_xml(self):
553 # For all elements in testbed,
554 # - gather immutable execute-readable attribuets lists
556 # Generate new design description from design xml
557 # (Wait for attributes lists - implicit syncpoint)
559 # For all elements in testbed,
560 # - gather all immutable execute-readable attribute
561 # values, asynchronously
562 # (Wait for attribute values - implicit syncpoint)
564 # For all elements in testbed,
565 # - inject non-None values into new design
566 # Generate execute xml from new design
568 attribute_lists = dict(
569 (testbed_guid, collections.defaultdict(dict))
570 for testbed_guid in self._testbeds
573 for testbed_guid, testbed in self._testbeds.iteritems():
574 guids = self._guids_in_testbed(testbed_guid)
576 attribute_lists[testbed_guid][guid] = \
577 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
579 parser = XmlExperimentParser()
580 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
582 attribute_values = dict(
583 (testbed_guid, collections.defaultdict(dict))
584 for testbed_guid in self._testbeds
587 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
588 testbed = self._testbeds[testbed_guid]
589 for guid, attribute_list in testbed_attribute_lists.iteritems():
590 attribute_list = _undefer(attribute_list)
591 attribute_values[testbed_guid][guid] = dict(
592 (attribute, testbed.get_deferred(guid, attribute))
593 for attribute in attribute_list
596 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
597 for guid, attribute_values in testbed_attribute_values.iteritems():
598 for attribute, value in attribute_values.iteritems():
599 value = _undefer(value)
600 if value is not None:
601 execute_data.add_attribute_data(guid, attribute, value)
603 self._experiment_execute_xml = parser.to_xml(data=execute_data)
606 for testbed in self._testbeds.values():
608 self._unpersist_testbed_proxies()
609 self._stopped_time = time.time()
612 # reload perviously persisted testbed access configurations
613 self._failed_testbeds.clear()
614 self._load_testbed_proxies()
616 # re-program testbeds that need recovery
617 self._start(recover = True)
619 def is_finished(self, guid):
620 testbed = self._testbed_for_guid(guid)
622 return testbed.status(guid) == AS.STATUS_FINISHED
623 raise RuntimeError("No element exists with guid %d" % guid)
625 def _testbed_recovery_policy(self, guid, data = None):
627 parser = XmlExperimentParser()
628 data = parser.from_xml_to_data(self._experiment_design_xml)
630 return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
632 def status(self, guid):
633 if guid in self._testbeds:
635 # report testbed status
636 if guid in self._failed_testbeds:
637 return TS.STATUS_FAILED
640 return self._testbeds[guid].status()
642 return TS.STATUS_UNRESPONSIVE
645 testbed = self._testbed_for_guid(guid)
646 if testbed is not None:
647 return testbed.status(guid)
649 return AS.STATUS_UNDETERMINED
651 def set(self, guid, name, value, time = TIME_NOW):
652 testbed = self._testbed_for_guid(guid)
654 testbed.set(guid, name, value, time)
656 raise RuntimeError("No element exists with guid %d" % guid)
658 def get(self, guid, name, time = TIME_NOW):
659 testbed = self._testbed_for_guid(guid)
661 return testbed.get(guid, name, time)
662 raise RuntimeError("No element exists with guid %d" % guid)
664 def get_deferred(self, guid, name, time = TIME_NOW):
665 testbed = self._testbed_for_guid(guid)
667 return testbed.get_deferred(guid, name, time)
668 raise RuntimeError("No element exists with guid %d" % guid)
670 def get_factory_id(self, guid):
671 testbed = self._testbed_for_guid(guid)
673 return testbed.get_factory_id(guid)
674 raise RuntimeError("No element exists with guid %d" % guid)
676 def get_testbed_id(self, guid):
677 testbed = self._testbed_for_guid(guid)
679 return testbed.testbed_id
680 raise RuntimeError("No element exists with guid %d" % guid)
682 def get_testbed_version(self, guid):
683 testbed = self._testbed_for_guid(guid)
685 return testbed.testbed_version
686 raise RuntimeError("No element exists with guid %d" % guid)
690 for testbed in self._testbeds.values():
694 exceptions.append(sys.exc_info())
695 for exc_info in exceptions:
696 raise exc_info[0], exc_info[1], exc_info[2]
698 def _testbed_for_guid(self, guid):
699 for testbed_guid in self._testbeds.keys():
700 if guid in self._guids_in_testbed(testbed_guid):
701 if testbed_guid in self._failed_testbeds:
703 return self._testbeds[testbed_guid]
706 def _guids_in_testbed(self, testbed_guid):
707 if testbed_guid not in self._testbeds:
709 if testbed_guid not in self._guids_in_testbed_cache:
710 self._guids_in_testbed_cache[testbed_guid] = \
711 set(self._testbeds[testbed_guid].guids)
712 return self._guids_in_testbed_cache[testbed_guid]
715 def _netref_component_split(component):
716 match = COMPONENT_PATTERN.match(component)
718 return match.group("kind"), match.group("index")
720 return component, None
722 _NETREF_COMPONENT_GETTERS = {
724 lambda testbed, guid, index, name:
725 testbed.get_address(guid, int(index), name),
727 lambda testbed, guid, index, name:
728 testbed.get_route(guid, int(index), name),
730 lambda testbed, guid, index, name:
731 testbed.trace(guid, index, attribute = name),
733 lambda testbed, guid, index, name:
734 testbed.get(guid, name),
737 def resolve_netref_value(self, value, failval = None):
740 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
741 label = match.group("label")
742 if label.startswith('GUID-'):
743 ref_guid = int(label[5:])
745 expr = match.group("expr")
746 component = (match.group("component") or "")[1:] # skip the dot
747 attribute = match.group("attribute")
749 # split compound components into component kind and index
750 # eg: 'addr[0]' -> ('addr', '0')
751 component, component_index = self._netref_component_split(component)
753 # find object and resolve expression
754 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
755 if component not in self._NETREF_COMPONENT_GETTERS:
756 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
757 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
760 ref_value = self._NETREF_COMPONENT_GETTERS[component](
761 ref_testbed, ref_guid, component_index, attribute)
763 value = rv = value.replace(match.group(), ref_value)
766 # unresolvable netref
773 def do_netrefs(self, data, fail_if_undefined = False):
775 for (testbed_guid, guid), attrs in self._netrefs.items():
776 testbed = self._testbeds.get(testbed_guid)
777 if testbed is not None:
778 for name in set(attrs):
779 value = testbed.get(guid, name)
780 if isinstance(value, basestring):
781 ref_value = self.resolve_netref_value(value)
782 if ref_value is not None:
783 testbed.set(guid, name, ref_value)
785 elif fail_if_undefined:
786 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
788 del self._netrefs[(testbed_guid, guid)]
791 for testbed_guid, attrs in self._testbed_netrefs.items():
792 tb_data = dict(data.get_attribute_data(testbed_guid))
794 for name in set(attrs):
795 value = tb_data.get(name)
796 if isinstance(value, basestring):
797 ref_value = self.resolve_netref_value(value)
798 if ref_value is not None:
799 data.set_attribute_data(testbed_guid, name, ref_value)
801 elif fail_if_undefined:
802 raise ValueError, "Unresolvable netref in: %r" % (value,)
804 del self._testbed_netrefs[testbed_guid]
807 def _init_testbed_controllers(self, data, recover = False):
808 blacklist_testbeds = set(self._testbeds)
809 element_guids = list()
811 data_guids = data.guids
815 # gather label associations
816 for guid in data_guids:
817 if not data.is_testbed_data(guid):
818 (testbed_guid, factory_id) = data.get_box_data(guid)
819 label = data.get_attribute_data(guid, "label")
820 if label is not None:
821 if label in label_guids:
822 raise RuntimeError, "Label %r is not unique" % (label,)
823 label_guids[label] = guid
825 # create testbed controllers
826 for guid in data_guids:
827 if data.is_testbed_data(guid):
828 if guid not in self._testbeds:
830 self._create_testbed_controller(
831 guid, data, element_guids, recover)
834 blacklist_testbeds.add(guid)
839 policy = self._testbed_recovery_policy(guid, data=data)
840 if policy == DC.POLICY_RECOVER:
841 self._create_testbed_controller(
842 guid, data, element_guids, False)
844 elif policy == DC.POLICY_RESTART:
845 self._create_testbed_controller(
846 guid, data, element_guids, False)
850 self._failed_testbeds.add(guid)
854 # queue programmable elements
855 # - that have not been programmed already (blacklist_testbeds)
856 # - including recovered or restarted testbeds
857 # - but those that have no unresolved netrefs
858 for guid in data_guids:
859 if not data.is_testbed_data(guid):
860 (testbed_guid, factory_id) = data.get_box_data(guid)
861 if testbed_guid not in blacklist_testbeds:
862 element_guids.append(guid)
864 # replace references to elements labels for its guid
865 self._resolve_labels(data, data_guids, label_guids)
867 # program testbed controllers
869 self._program_testbed_controllers(element_guids, data)
871 return to_recover, to_restart
873 def _resolve_labels(self, data, data_guids, label_guids):
874 netrefs = self._netrefs
875 testbed_netrefs = self._testbed_netrefs
876 for guid in data_guids:
877 for name, value in data.get_attribute_data(guid):
878 if isinstance(value, basestring):
880 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
881 label = match.group("label")
882 if not label.startswith('GUID-'):
883 ref_guid = label_guids.get(label)
884 if ref_guid is not None:
885 value = value.replace(
887 ATTRIBUTE_PATTERN_GUID_SUB % dict(
888 guid = 'GUID-%d' % (ref_guid,),
889 expr = match.group("expr"),
892 data.set_attribute_data(guid, name, value)
894 # memorize which guid-attribute pairs require
895 # postprocessing, to avoid excessive controller-testbed
896 # communication at configuration time
897 # (which could require high-latency network I/O)
898 if not data.is_testbed_data(guid):
899 (testbed_guid, factory_id) = data.get_box_data(guid)
900 netrefs[(testbed_guid, guid)].add(name)
902 testbed_netrefs[guid].add(name)
908 def _create_testbed_controller(self, guid, data, element_guids, recover):
909 (testbed_id, testbed_version) = data.get_testbed_data(guid)
910 deployment_config = self._deployment_config.get(guid)
912 # deferred import because proxy needs
913 # our class definitions to define proxies
914 import nepi.util.proxy as proxy
916 if deployment_config is None:
918 deployment_config = proxy.AccessConfiguration()
920 for (name, value) in data.get_attribute_data(guid):
921 if value is not None and deployment_config.has_attribute(name):
922 # if any deployment config attribute has a netref, we can't
923 # create this controller yet
924 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
925 # remember to re-issue this one
926 self._netreffed_testbeds.add(guid)
929 # copy deployment config attribute
930 deployment_config.set_attribute_value(name, value)
933 self._deployment_config[guid] = deployment_config
935 if deployment_config is not None:
936 # force recovery mode
937 deployment_config.set_attribute_value("recover",recover)
939 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
941 for (name, value) in data.get_attribute_data(guid):
942 testbed.defer_configure(name, value)
943 self._testbeds[guid] = testbed
944 if guid in self._netreffed_testbeds:
945 self._netreffed_testbeds.remove(guid)
947 def _program_testbed_controllers(self, element_guids, data):
948 def resolve_create_netref(data, guid, name, value):
949 # Try to resolve create-time netrefs, if possible
950 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
952 nuvalue = self.resolve_netref_value(value)
954 # Any trouble means we're not in shape to resolve the netref yet
956 if nuvalue is not None:
957 # Only if we succeed we remove the netref deferral entry
959 data.set_attribute_data(guid, name, value)
960 if (testbed_guid, guid) in self._netrefs:
961 self._netrefs[(testbed_guid, guid)].discard(name)
964 for guid in element_guids:
965 (testbed_guid, factory_id) = data.get_box_data(guid)
966 testbed = self._testbeds.get(testbed_guid)
967 if testbed is not None:
969 testbed.defer_create(guid, factory_id)
971 for (name, value) in data.get_attribute_data(guid):
972 value = resolve_create_netref(data, guid, name, value)
973 testbed.defer_create_set(guid, name, value)
975 for guid in element_guids:
976 (testbed_guid, factory_id) = data.get_box_data(guid)
977 testbed = self._testbeds.get(testbed_guid)
978 if testbed is not None:
980 for trace_id in data.get_trace_data(guid):
981 testbed.defer_add_trace(guid, trace_id)
983 for (address, netprefix, broadcast) in data.get_address_data(guid):
985 testbed.defer_add_address(guid, address, netprefix,
988 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
989 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
990 # store connections data
991 for (connector_type_name, other_guid, other_connector_type_name) \
992 in data.get_connection_data(guid):
993 (other_testbed_guid, other_factory_id) = data.get_box_data(
995 if testbed_guid == other_testbed_guid:
996 # each testbed should take care of enforcing internal
997 # connection simmetry, so each connection is only
998 # added in one direction
999 testbed.defer_connect(guid, connector_type_name,
1000 other_guid, other_connector_type_name)
1002 def _program_testbed_cross_connections(self, data):
1003 data_guids = data.guids
1004 for guid in data_guids:
1005 if not data.is_testbed_data(guid):
1006 (testbed_guid, factory_id) = data.get_box_data(guid)
1007 testbed = self._testbeds.get(testbed_guid)
1008 if testbed is not None:
1009 for (connector_type_name, cross_guid, cross_connector_type_name) \
1010 in data.get_connection_data(guid):
1011 (testbed_guid, factory_id) = data.get_box_data(guid)
1012 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
1014 if testbed_guid != cross_testbed_guid:
1015 cross_testbed = self._testbeds[cross_testbed_guid]
1016 cross_testbed_id = cross_testbed.testbed_id
1017 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
1018 cross_testbed_guid, cross_testbed_id, cross_factory_id,
1019 cross_connector_type_name)
1020 # save cross data for later
1021 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1024 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1025 if testbed_guid not in self._cross_data:
1026 self._cross_data[testbed_guid] = dict()
1027 if cross_testbed_guid not in self._cross_data[testbed_guid]:
1028 self._cross_data[testbed_guid][cross_testbed_guid] = set()
1029 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1031 def _get_cross_data(self, testbed_guid):
1033 if not testbed_guid in self._cross_data:
1036 # fetch attribute lists in one batch
1037 attribute_lists = dict()
1038 for cross_testbed_guid, guid_list in \
1039 self._cross_data[testbed_guid].iteritems():
1040 cross_testbed = self._testbeds[cross_testbed_guid]
1041 for cross_guid in guid_list:
1042 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1043 cross_testbed.get_attribute_list_deferred(cross_guid)
1045 # fetch attribute values in another batch
1046 for cross_testbed_guid, guid_list in \
1047 self._cross_data[testbed_guid].iteritems():
1048 cross_data[cross_testbed_guid] = dict()
1049 cross_testbed = self._testbeds[cross_testbed_guid]
1050 for cross_guid in guid_list:
1051 elem_cross_data = dict(
1053 _testbed_guid = cross_testbed_guid,
1054 _testbed_id = cross_testbed.testbed_id,
1055 _testbed_version = cross_testbed.testbed_version)
1056 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1057 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1058 for attr_name in attribute_list:
1059 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1060 elem_cross_data[attr_name] = attr_value
1062 # undefer all values - we'll have to serialize them probably later
1063 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1064 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1065 for attr_name, attr_value in elem_cross_data.iteritems():
1066 elem_cross_data[attr_name] = _undefer(attr_value)
1070 class ExperimentSuite(object):
1071 def __init__(self, experiment_xml, access_config, repetitions,
1072 duration, wait_guids):
1073 self._experiment_xml = experiment_xml
1074 self._access_config = access_config
1075 self._experiments = dict()
1076 self._repetitions = repetitions
1077 self._duration = duration
1078 self._wait_guids = wait_guids
1079 self._current = None
1080 self._status = TS.STATUS_ZERO
1084 self._status = TS.STATUS_STARTED
1085 self._thread = threading.Thread(target = self._run_experiment_suite)
1086 self._thread.start()
1093 def _run_experiment_suite(self):
1094 for i in xrange[0, self.repetitions]:
1096 self._run_one_experiment()
1098 def _run_one_experiment(self):
1099 access_config = proxy.AccessConfiguration()
1100 for attr in self._access_config.attributes:
1101 access_config.set_attribute_value(attr.name, attr.value)
1102 access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1103 root_dir = "%s_%d" % (
1104 access_config.get_attribute_value(DC.ROOT_DIRECTORY),
1106 access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1107 controller = proxy.create_experiment_controller(self._experiment_xml,
1109 self._experiments[self._current] = controller
1111 started_at = time.time()
1112 # wait until all specified guids have finished execution
1113 if self._wait_guids:
1114 while all(itertools.imap(controller.is_finished, self._wait_guids):
1116 # wait until the minimum experiment duration time has elapsed
1118 while (time.time() - started_at) < self._duration:
1122 controller.shutdown()