2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
19 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
20 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
21 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
23 def _undefer(deferred):
24 if hasattr(deferred, '_get'):
25 return deferred._get()
30 class TestbedController(object):
31 def __init__(self, testbed_id, testbed_version):
32 self._testbed_id = testbed_id
33 self._testbed_version = testbed_version
37 return self._testbed_id
40 def testbed_version(self):
41 return self._testbed_version
45 raise NotImplementedError
47 def defer_configure(self, name, value):
48 """Instructs setting a configuartion attribute for the testbed instance"""
49 raise NotImplementedError
51 def defer_create(self, guid, factory_id):
52 """Instructs creation of element """
53 raise NotImplementedError
55 def defer_create_set(self, guid, name, value):
56 """Instructs setting an initial attribute on an element"""
57 raise NotImplementedError
59 def defer_factory_set(self, guid, name, value):
60 """Instructs setting an attribute on a factory"""
61 raise NotImplementedError
63 def defer_connect(self, guid1, connector_type_name1, guid2,
64 connector_type_name2):
65 """Instructs creation of a connection between the given connectors"""
66 raise NotImplementedError
68 def defer_cross_connect(self,
69 guid, connector_type_name,
70 cross_guid, cross_testbed_guid,
71 cross_testbed_id, cross_factory_id,
72 cross_connector_type_name):
74 Instructs creation of a connection between the given connectors
75 of different testbed instances
77 raise NotImplementedError
79 def defer_add_trace(self, guid, trace_id):
80 """Instructs the addition of a trace"""
81 raise NotImplementedError
83 def defer_add_address(self, guid, address, netprefix, broadcast):
84 """Instructs the addition of an address"""
85 raise NotImplementedError
87 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
88 """Instructs the addition of a route"""
89 raise NotImplementedError
92 """After do_setup the testbed initial configuration is done"""
93 raise NotImplementedError
97 After do_create all instructed elements are created and
100 raise NotImplementedError
102 def do_connect_init(self):
104 After do_connect_init all internal connections between testbed elements
107 raise NotImplementedError
109 def do_connect_compl(self):
111 After do_connect all internal connections between testbed elements
114 raise NotImplementedError
116 def do_preconfigure(self):
118 Done just before resolving netrefs, after connection, before cross connections,
119 useful for early stages of configuration, for setting up stuff that might be
120 required for netref resolution.
122 raise NotImplementedError
124 def do_configure(self):
125 """After do_configure elements are configured"""
126 raise NotImplementedError
128 def do_prestart(self):
129 """Before do_start elements are prestart-configured"""
130 raise NotImplementedError
132 def do_cross_connect_init(self, cross_data):
134 After do_cross_connect_init initiation of all external connections
135 between different testbed elements is performed
137 raise NotImplementedError
139 def do_cross_connect_compl(self, cross_data):
141 After do_cross_connect_compl completion of all external connections
142 between different testbed elements is performed
144 raise NotImplementedError
147 raise NotImplementedError
150 raise NotImplementedError
154 On testbed recovery (if recovery is a supported policy), the controller
155 instance will be re-created and the following sequence invoked:
158 defer_X - programming the testbed with persisted execution values
159 (not design values). Execution values (ExecImmutable attributes)
160 should be enough to recreate the testbed's state.
162 <cross-connection methods>
164 Start will not be called, and after cross connection invocations,
165 the testbed is supposed to be fully functional again.
167 raise NotImplementedError
169 def set(self, guid, name, value, time = TIME_NOW):
170 raise NotImplementedError
172 def get(self, guid, name, time = TIME_NOW):
173 raise NotImplementedError
175 def get_route(self, guid, index, attribute):
179 guid: guid of box to query
180 index: number of routing entry to fetch
181 attribute: one of Destination, NextHop, NetPrefix
183 raise NotImplementedError
185 def get_address(self, guid, index, attribute='Address'):
189 guid: guid of box to query
190 index: number of inteface to select
191 attribute: one of Address, NetPrefix, Broadcast
193 raise NotImplementedError
195 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
196 raise NotImplementedError
198 def get_factory_id(self, guid):
199 raise NotImplementedError
201 def action(self, time, guid, action):
202 raise NotImplementedError
204 def status(self, guid):
205 raise NotImplementedError
207 def testbed_status(self):
208 raise NotImplementedError
210 def trace(self, guid, trace_id, attribute='value'):
211 raise NotImplementedError
213 def traces_info(self):
214 """ dictionary of dictionaries:
220 filesize = size in bytes,
224 raise NotImplementedError
227 raise NotImplementedError
229 class ExperimentController(object):
230 def __init__(self, experiment_xml, root_dir):
231 self._experiment_design_xml = experiment_xml
232 self._experiment_execute_xml = None
233 self._testbeds = dict()
234 self._deployment_config = dict()
235 self._netrefs = collections.defaultdict(set)
236 self._testbed_netrefs = collections.defaultdict(set)
237 self._cross_data = dict()
238 self._root_dir = root_dir
239 self._netreffed_testbeds = set()
240 self._guids_in_testbed_cache = dict()
241 self._failed_testbeds = set()
242 self._started_time = None
243 self._stopped_time = None
245 self._logger = logging.getLogger('nepi.core.execute')
246 level = logging.ERROR
247 if os.environ.get("NEPI_CONTROLLER_LOGLEVEL",
248 DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
249 level = logging.DEBUG
250 self._logger.setLevel(level)
252 if experiment_xml is None and root_dir is not None:
254 self.load_experiment_xml()
255 self.load_execute_xml()
257 self.persist_experiment_xml()
260 def experiment_design_xml(self):
261 return self._experiment_design_xml
264 def experiment_execute_xml(self):
265 return self._experiment_execute_xml
268 def started_time(self):
269 return self._started_time
272 def stopped_time(self):
273 return self._stopped_time
278 for testbed_guid in self._testbeds.keys():
279 _guids = self._guids_in_testbed(testbed_guid)
284 def persist_experiment_xml(self):
285 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
286 f = open(xml_path, "w")
287 f.write(self._experiment_design_xml)
290 def persist_execute_xml(self):
291 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
292 f = open(xml_path, "w")
293 f.write(self._experiment_execute_xml)
296 def load_experiment_xml(self):
297 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
298 f = open(xml_path, "r")
299 self._experiment_design_xml = f.read()
302 def load_execute_xml(self):
303 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
304 f = open(xml_path, "r")
305 self._experiment_execute_xml = f.read()
308 def trace(self, guid, trace_id, attribute='value'):
309 testbed = self._testbed_for_guid(guid)
311 return testbed.trace(guid, trace_id, attribute)
312 raise RuntimeError("No element exists with guid %d" % guid)
314 def traces_info(self):
316 for guid, testbed in self._testbeds.iteritems():
317 tinfo = testbed.traces_info()
319 traces_info[guid] = testbed.traces_info()
323 def _parallel(callables):
326 @functools.wraps(callable)
327 def wrapped(*p, **kw):
331 logging.exception("Exception occurred in asynchronous thread:")
332 excs.append(sys.exc_info())
334 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
335 for thread in threads:
337 for thread in threads:
340 eTyp, eVal, eLoc = exc
341 raise eTyp, eVal, eLoc
344 self._started_time = time.time()
347 def _start(self, recover = False):
348 parser = XmlExperimentParser()
351 xml = self._experiment_execute_xml
353 xml = self._experiment_design_xml
354 data = parser.from_xml_to_data(xml)
356 # instantiate testbed controllers
357 to_recover, to_restart = self._init_testbed_controllers(data, recover)
358 all_restart = set(to_restart)
361 # persist testbed connection data, for potential recovery
362 self._persist_testbed_proxies()
364 # recover recoverable controllers
365 for guid in to_recover:
367 self._testbeds[guid].do_setup()
368 self._testbeds[guid].recover()
370 self._logger.exception("During recovery of testbed %s", guid)
373 self._failed_testbeds.add(guid)
375 def steps_to_configure(self, allowed_guids):
376 # perform setup in parallel for all test beds,
377 # wait for all threads to finish
379 self._logger.debug("ExperimentController: Starting parallel do_setup")
380 self._parallel([testbed.do_setup
381 for guid,testbed in self._testbeds.iteritems()
382 if guid in allowed_guids])
384 # perform create-connect in parallel, wait
385 # (internal connections only)
386 self._logger.debug("ExperimentController: Starting parallel do_create")
387 self._parallel([testbed.do_create
388 for guid,testbed in self._testbeds.iteritems()
389 if guid in allowed_guids])
391 self._logger.debug("ExperimentController: Starting parallel do_connect_init")
392 self._parallel([testbed.do_connect_init
393 for guid,testbed in self._testbeds.iteritems()
394 if guid in allowed_guids])
396 self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
397 self._parallel([testbed.do_connect_compl
398 for guid,testbed in self._testbeds.iteritems()
399 if guid in allowed_guids])
401 self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
402 self._parallel([testbed.do_preconfigure
403 for guid,testbed in self._testbeds.iteritems()
404 if guid in allowed_guids])
407 steps_to_configure(self, to_restart)
409 if self._netreffed_testbeds:
410 self._logger.debug("ExperimentController: Resolving netreffed testbeds")
411 # initally resolve netrefs
412 self.do_netrefs(data, fail_if_undefined=False)
414 # rinse and repeat, for netreffed testbeds
415 netreffed_testbeds = set(self._netreffed_testbeds)
417 to_recover, to_restart = self._init_testbed_controllers(data, recover)
418 all_restart.update(to_restart)
421 # persist testbed connection data, for potential recovery
422 self._persist_testbed_proxies()
424 # recover recoverable controllers
425 for guid in to_recover:
427 self._testbeds[guid].do_setup()
428 self._testbeds[guid].recover()
430 self._logger.exception("During recovery of testbed %s", guid)
433 self._failed_testbeds.add(guid)
435 # configure dependant testbeds
436 steps_to_configure(self, to_restart)
438 all_restart = [ self._testbeds[guid] for guid in all_restart ]
440 # final netref step, fail if anything's left unresolved
441 self._logger.debug("ExperimentController: Resolving do_netrefs")
442 self.do_netrefs(data, fail_if_undefined=False)
444 # Only now, that netref dependencies have been solve, it is safe to
445 # program cross_connections
446 self._logger.debug("ExperimentController: Programming testbed cross-connections")
447 self._program_testbed_cross_connections(data)
449 # perform do_configure in parallel for al testbeds
450 # (it's internal configuration for each)
451 self._logger.debug("ExperimentController: Starting parallel do_configure")
452 self._parallel([testbed.do_configure
453 for testbed in all_restart])
457 #print >>sys.stderr, "DO IT"
461 # cross-connect (cannot be done in parallel)
462 self._logger.debug("ExperimentController: Starting cross-connect")
463 for guid, testbed in self._testbeds.iteritems():
464 cross_data = self._get_cross_data(guid)
465 testbed.do_cross_connect_init(cross_data)
466 for guid, testbed in self._testbeds.iteritems():
467 cross_data = self._get_cross_data(guid)
468 testbed.do_cross_connect_compl(cross_data)
472 # Last chance to configure (parallel on all testbeds)
473 self._logger.debug("ExperimentController: Starting parallel do_prestart")
474 self._parallel([testbed.do_prestart
475 for testbed in all_restart])
477 # final netref step, fail if anything's left unresolved
478 self.do_netrefs(data, fail_if_undefined=True)
483 # update execution xml with execution-specific values
484 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
485 self._update_execute_xml()
486 self.persist_execute_xml()
488 # start experiment (parallel start on all testbeds)
489 self._logger.debug("ExperimentController: Starting parallel do_start")
490 self._parallel([testbed.start
491 for testbed in all_restart])
495 def _clear_caches(self):
496 # Cleaning cache for safety.
497 self._guids_in_testbed_cache = dict()
499 def _persist_testbed_proxies(self):
500 TRANSIENT = (DC.RECOVER,)
502 # persist access configuration for all testbeds, so that
503 # recovery mode can reconnect to them if it becomes necessary
504 conf = ConfigParser.RawConfigParser()
505 for testbed_guid, testbed_config in self._deployment_config.iteritems():
506 testbed_guid = str(testbed_guid)
507 conf.add_section(testbed_guid)
508 for attr in testbed_config.get_attribute_list():
509 if attr not in TRANSIENT:
510 conf.set(testbed_guid, attr,
511 testbed_config.get_attribute_value(attr))
513 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
517 def _load_testbed_proxies(self):
519 Attribute.STRING : 'get',
520 Attribute.BOOL : 'getboolean',
521 Attribute.ENUM : 'get',
522 Attribute.DOUBLE : 'getfloat',
523 Attribute.INTEGER : 'getint',
526 TRANSIENT = (DC.RECOVER,)
528 # deferred import because proxy needs
529 # our class definitions to define proxies
530 import nepi.util.proxy as proxy
532 conf = ConfigParser.RawConfigParser()
533 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
534 for testbed_guid in conf.sections():
535 testbed_config = proxy.AccessConfiguration()
536 testbed_guid = str(testbed_guid)
537 for attr in testbed_config.get_attribute_list():
538 if attr not in TRANSIENT:
539 getter = getattr(conf, TYPEMAP.get(
540 testbed_config.get_attribute_type(attr),
542 testbed_config.set_attribute_value(
543 attr, getter(testbed_guid, attr))
545 def _unpersist_testbed_proxies(self):
547 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
549 # Just print exceptions, this is just cleanup
550 self._logger.exception("Loading testbed configuration")
552 def _update_execute_xml(self):
554 # For all elements in testbed,
555 # - gather immutable execute-readable attribuets lists
557 # Generate new design description from design xml
558 # (Wait for attributes lists - implicit syncpoint)
560 # For all elements in testbed,
561 # - gather all immutable execute-readable attribute
562 # values, asynchronously
563 # (Wait for attribute values - implicit syncpoint)
565 # For all elements in testbed,
566 # - inject non-None values into new design
567 # Generate execute xml from new design
569 attribute_lists = dict(
570 (testbed_guid, collections.defaultdict(dict))
571 for testbed_guid in self._testbeds
574 for testbed_guid, testbed in self._testbeds.iteritems():
575 guids = self._guids_in_testbed(testbed_guid)
577 attribute_lists[testbed_guid][guid] = \
578 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
580 parser = XmlExperimentParser()
581 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
583 attribute_values = dict(
584 (testbed_guid, collections.defaultdict(dict))
585 for testbed_guid in self._testbeds
588 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
589 testbed = self._testbeds[testbed_guid]
590 for guid, attribute_list in testbed_attribute_lists.iteritems():
591 attribute_list = _undefer(attribute_list)
592 attribute_values[testbed_guid][guid] = dict(
593 (attribute, testbed.get_deferred(guid, attribute))
594 for attribute in attribute_list
597 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
598 for guid, attribute_values in testbed_attribute_values.iteritems():
599 for attribute, value in attribute_values.iteritems():
600 value = _undefer(value)
601 if value is not None:
602 execute_data.add_attribute_data(guid, attribute, value)
604 self._experiment_execute_xml = parser.to_xml(data=execute_data)
607 for testbed in self._testbeds.values():
609 self._unpersist_testbed_proxies()
610 self._stopped_time = time.time()
613 # reload perviously persisted testbed access configurations
614 self._failed_testbeds.clear()
615 self._load_testbed_proxies()
617 # re-program testbeds that need recovery
618 self._start(recover = True)
620 def is_finished(self, guid):
621 testbed = self._testbed_for_guid(guid)
623 return testbed.status(guid) == AS.STATUS_FINISHED
624 raise RuntimeError("No element exists with guid %d" % guid)
626 def _testbed_recovery_policy(self, guid, data = None):
628 parser = XmlExperimentParser()
629 data = parser.from_xml_to_data(self._experiment_design_xml)
631 return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
633 def status(self, guid):
634 if guid in self._testbeds:
636 # report testbed status
637 if guid in self._failed_testbeds:
638 return TS.STATUS_FAILED
641 return self._testbeds[guid].status()
643 return TS.STATUS_UNRESPONSIVE
646 testbed = self._testbed_for_guid(guid)
647 if testbed is not None:
648 return testbed.status(guid)
650 return AS.STATUS_UNDETERMINED
652 def set(self, guid, name, value, time = TIME_NOW):
653 testbed = self._testbed_for_guid(guid)
655 testbed.set(guid, name, value, time)
657 raise RuntimeError("No element exists with guid %d" % guid)
659 def get(self, guid, name, time = TIME_NOW):
660 testbed = self._testbed_for_guid(guid)
662 return testbed.get(guid, name, time)
663 raise RuntimeError("No element exists with guid %d" % guid)
665 def get_deferred(self, guid, name, time = TIME_NOW):
666 testbed = self._testbed_for_guid(guid)
668 return testbed.get_deferred(guid, name, time)
669 raise RuntimeError("No element exists with guid %d" % guid)
671 def get_factory_id(self, guid):
672 testbed = self._testbed_for_guid(guid)
674 return testbed.get_factory_id(guid)
675 raise RuntimeError("No element exists with guid %d" % guid)
677 def get_testbed_id(self, guid):
678 testbed = self._testbed_for_guid(guid)
680 return testbed.testbed_id
681 raise RuntimeError("No element exists with guid %d" % guid)
683 def get_testbed_version(self, guid):
684 testbed = self._testbed_for_guid(guid)
686 return testbed.testbed_version
687 raise RuntimeError("No element exists with guid %d" % guid)
691 for testbed in self._testbeds.values():
695 exceptions.append(sys.exc_info())
696 for exc_info in exceptions:
697 raise exc_info[0], exc_info[1], exc_info[2]
699 def _testbed_for_guid(self, guid):
700 for testbed_guid in self._testbeds.keys():
701 if guid in self._guids_in_testbed(testbed_guid):
702 if testbed_guid in self._failed_testbeds:
704 return self._testbeds[testbed_guid]
707 def _guids_in_testbed(self, testbed_guid):
708 if testbed_guid not in self._testbeds:
710 if testbed_guid not in self._guids_in_testbed_cache:
711 self._guids_in_testbed_cache[testbed_guid] = \
712 set(self._testbeds[testbed_guid].guids)
713 return self._guids_in_testbed_cache[testbed_guid]
716 def _netref_component_split(component):
717 match = COMPONENT_PATTERN.match(component)
719 return match.group("kind"), match.group("index")
721 return component, None
723 _NETREF_COMPONENT_GETTERS = {
725 lambda testbed, guid, index, name:
726 testbed.get_address(guid, int(index), name),
728 lambda testbed, guid, index, name:
729 testbed.get_route(guid, int(index), name),
731 lambda testbed, guid, index, name:
732 testbed.trace(guid, index, attribute = name),
734 lambda testbed, guid, index, name:
735 testbed.get(guid, name),
738 def resolve_netref_value(self, value, failval = None):
741 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
742 label = match.group("label")
743 if label.startswith('GUID-'):
744 ref_guid = int(label[5:])
746 expr = match.group("expr")
747 component = (match.group("component") or "")[1:] # skip the dot
748 attribute = match.group("attribute")
750 # split compound components into component kind and index
751 # eg: 'addr[0]' -> ('addr', '0')
752 component, component_index = self._netref_component_split(component)
754 # find object and resolve expression
755 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
756 if component not in self._NETREF_COMPONENT_GETTERS:
757 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
758 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
761 ref_value = self._NETREF_COMPONENT_GETTERS[component](
762 ref_testbed, ref_guid, component_index, attribute)
764 value = rv = value.replace(match.group(), ref_value)
767 # unresolvable netref
774 def do_netrefs(self, data, fail_if_undefined = False):
776 for (testbed_guid, guid), attrs in self._netrefs.items():
777 testbed = self._testbeds.get(testbed_guid)
778 if testbed is not None:
779 for name in set(attrs):
780 value = testbed.get(guid, name)
781 if isinstance(value, basestring):
782 ref_value = self.resolve_netref_value(value)
783 if ref_value is not None:
784 testbed.set(guid, name, ref_value)
786 elif fail_if_undefined:
787 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
789 del self._netrefs[(testbed_guid, guid)]
792 for testbed_guid, attrs in self._testbed_netrefs.items():
793 tb_data = dict(data.get_attribute_data(testbed_guid))
795 for name in set(attrs):
796 value = tb_data.get(name)
797 if isinstance(value, basestring):
798 ref_value = self.resolve_netref_value(value)
799 if ref_value is not None:
800 data.set_attribute_data(testbed_guid, name, ref_value)
802 elif fail_if_undefined:
803 raise ValueError, "Unresolvable netref in: %r" % (value,)
805 del self._testbed_netrefs[testbed_guid]
808 def _init_testbed_controllers(self, data, recover = False):
809 blacklist_testbeds = set(self._testbeds)
810 element_guids = list()
812 data_guids = data.guids
816 # gather label associations
817 for guid in data_guids:
818 if not data.is_testbed_data(guid):
819 (testbed_guid, factory_id) = data.get_box_data(guid)
820 label = data.get_attribute_data(guid, "label")
821 if label is not None:
822 if label in label_guids:
823 raise RuntimeError, "Label %r is not unique" % (label,)
824 label_guids[label] = guid
826 # create testbed controllers
827 for guid in data_guids:
828 if data.is_testbed_data(guid):
829 if guid not in self._testbeds:
831 self._create_testbed_controller(
832 guid, data, element_guids, recover)
835 blacklist_testbeds.add(guid)
840 policy = self._testbed_recovery_policy(guid, data=data)
841 if policy == DC.POLICY_RECOVER:
842 self._create_testbed_controller(
843 guid, data, element_guids, False)
845 elif policy == DC.POLICY_RESTART:
846 self._create_testbed_controller(
847 guid, data, element_guids, False)
851 self._failed_testbeds.add(guid)
855 # queue programmable elements
856 # - that have not been programmed already (blacklist_testbeds)
857 # - including recovered or restarted testbeds
858 # - but those that have no unresolved netrefs
859 for guid in data_guids:
860 if not data.is_testbed_data(guid):
861 (testbed_guid, factory_id) = data.get_box_data(guid)
862 if testbed_guid not in blacklist_testbeds:
863 element_guids.append(guid)
865 # replace references to elements labels for its guid
866 self._resolve_labels(data, data_guids, label_guids)
868 # program testbed controllers
870 self._program_testbed_controllers(element_guids, data)
872 return to_recover, to_restart
874 def _resolve_labels(self, data, data_guids, label_guids):
875 netrefs = self._netrefs
876 testbed_netrefs = self._testbed_netrefs
877 for guid in data_guids:
878 for name, value in data.get_attribute_data(guid):
879 if isinstance(value, basestring):
881 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
882 label = match.group("label")
883 if not label.startswith('GUID-'):
884 ref_guid = label_guids.get(label)
885 if ref_guid is not None:
886 value = value.replace(
888 ATTRIBUTE_PATTERN_GUID_SUB % dict(
889 guid = 'GUID-%d' % (ref_guid,),
890 expr = match.group("expr"),
893 data.set_attribute_data(guid, name, value)
895 # memorize which guid-attribute pairs require
896 # postprocessing, to avoid excessive controller-testbed
897 # communication at configuration time
898 # (which could require high-latency network I/O)
899 if not data.is_testbed_data(guid):
900 (testbed_guid, factory_id) = data.get_box_data(guid)
901 netrefs[(testbed_guid, guid)].add(name)
903 testbed_netrefs[guid].add(name)
909 def _create_testbed_controller(self, guid, data, element_guids, recover):
910 (testbed_id, testbed_version) = data.get_testbed_data(guid)
911 deployment_config = self._deployment_config.get(guid)
913 # deferred import because proxy needs
914 # our class definitions to define proxies
915 import nepi.util.proxy as proxy
917 if deployment_config is None:
919 deployment_config = proxy.AccessConfiguration()
921 for (name, value) in data.get_attribute_data(guid):
922 if value is not None and deployment_config.has_attribute(name):
923 # if any deployment config attribute has a netref, we can't
924 # create this controller yet
925 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
926 # remember to re-issue this one
927 self._netreffed_testbeds.add(guid)
930 # copy deployment config attribute
931 deployment_config.set_attribute_value(name, value)
934 self._deployment_config[guid] = deployment_config
936 if deployment_config is not None:
937 # force recovery mode
938 deployment_config.set_attribute_value("recover",recover)
940 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
942 for (name, value) in data.get_attribute_data(guid):
943 testbed.defer_configure(name, value)
944 self._testbeds[guid] = testbed
945 if guid in self._netreffed_testbeds:
946 self._netreffed_testbeds.remove(guid)
948 def _program_testbed_controllers(self, element_guids, data):
949 def resolve_create_netref(data, guid, name, value):
950 # Try to resolve create-time netrefs, if possible
951 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
953 nuvalue = self.resolve_netref_value(value)
955 # Any trouble means we're not in shape to resolve the netref yet
957 if nuvalue is not None:
958 # Only if we succeed we remove the netref deferral entry
960 data.set_attribute_data(guid, name, value)
961 if (testbed_guid, guid) in self._netrefs:
962 self._netrefs[(testbed_guid, guid)].discard(name)
965 for guid in element_guids:
966 (testbed_guid, factory_id) = data.get_box_data(guid)
967 testbed = self._testbeds.get(testbed_guid)
968 if testbed is not None:
970 testbed.defer_create(guid, factory_id)
972 for (name, value) in data.get_attribute_data(guid):
973 value = resolve_create_netref(data, guid, name, value)
974 testbed.defer_create_set(guid, name, value)
976 for guid in element_guids:
977 (testbed_guid, factory_id) = data.get_box_data(guid)
978 testbed = self._testbeds.get(testbed_guid)
979 if testbed is not None:
981 for trace_id in data.get_trace_data(guid):
982 testbed.defer_add_trace(guid, trace_id)
984 for (address, netprefix, broadcast) in data.get_address_data(guid):
986 testbed.defer_add_address(guid, address, netprefix,
989 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
990 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
991 # store connections data
992 for (connector_type_name, other_guid, other_connector_type_name) \
993 in data.get_connection_data(guid):
994 (other_testbed_guid, other_factory_id) = data.get_box_data(
996 if testbed_guid == other_testbed_guid:
997 # each testbed should take care of enforcing internal
998 # connection simmetry, so each connection is only
999 # added in one direction
1000 testbed.defer_connect(guid, connector_type_name,
1001 other_guid, other_connector_type_name)
1003 def _program_testbed_cross_connections(self, data):
1004 data_guids = data.guids
1005 for guid in data_guids:
1006 if not data.is_testbed_data(guid):
1007 (testbed_guid, factory_id) = data.get_box_data(guid)
1008 testbed = self._testbeds.get(testbed_guid)
1009 if testbed is not None:
1010 for (connector_type_name, cross_guid, cross_connector_type_name) \
1011 in data.get_connection_data(guid):
1012 (testbed_guid, factory_id) = data.get_box_data(guid)
1013 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
1015 if testbed_guid != cross_testbed_guid:
1016 cross_testbed = self._testbeds[cross_testbed_guid]
1017 cross_testbed_id = cross_testbed.testbed_id
1018 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
1019 cross_testbed_guid, cross_testbed_id, cross_factory_id,
1020 cross_connector_type_name)
1021 # save cross data for later
1022 self._logger.debug("ExperimentController: adding cross_connection data tbd=%d:guid=%d - tbd=%d:guid=%d" % \
1023 (testbed_guid, guid, cross_testbed_guid, cross_guid))
1024 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1027 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1028 if testbed_guid not in self._cross_data:
1029 self._cross_data[testbed_guid] = dict()
1030 if cross_testbed_guid not in self._cross_data[testbed_guid]:
1031 self._cross_data[testbed_guid][cross_testbed_guid] = set()
1032 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1034 def _get_cross_data(self, testbed_guid):
1036 if not testbed_guid in self._cross_data:
1039 # fetch attribute lists in one batch
1040 attribute_lists = dict()
1041 for cross_testbed_guid, guid_list in \
1042 self._cross_data[testbed_guid].iteritems():
1043 cross_testbed = self._testbeds[cross_testbed_guid]
1044 for cross_guid in guid_list:
1045 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1046 cross_testbed.get_attribute_list_deferred(cross_guid)
1048 # fetch attribute values in another batch
1049 for cross_testbed_guid, guid_list in \
1050 self._cross_data[testbed_guid].iteritems():
1051 cross_data[cross_testbed_guid] = dict()
1052 cross_testbed = self._testbeds[cross_testbed_guid]
1053 for cross_guid in guid_list:
1054 elem_cross_data = dict(
1056 _testbed_guid = cross_testbed_guid,
1057 _testbed_id = cross_testbed.testbed_id,
1058 _testbed_version = cross_testbed.testbed_version)
1059 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1060 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1061 for attr_name in attribute_list:
1062 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1063 elem_cross_data[attr_name] = attr_value
1065 # undefer all values - we'll have to serialize them probably later
1066 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1067 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1068 for attr_name, attr_value in elem_cross_data.iteritems():
1069 elem_cross_data[attr_name] = _undefer(attr_value)
1073 class ExperimentSuite(object):
1074 def __init__(self, experiment_xml, access_config, repetitions,
1075 duration, wait_guids):
1076 self._experiment_xml = experiment_xml
1077 self._access_config = access_config
1078 self._experiments = dict()
1079 self._repetitions = repetitions
1080 self._duration = duration
1081 self._wait_guids = wait_guids
1082 self._current = None
1083 self._status = TS.STATUS_ZERO
1087 self._status = TS.STATUS_STARTED
1088 self._thread = threading.Thread(target = self._run_experiment_suite)
1089 self._thread.start()
1096 def _run_experiment_suite(self):
1097 for i in xrange[0, self.repetitions]:
1099 self._run_one_experiment()
1101 def _run_one_experiment(self):
1102 access_config = proxy.AccessConfiguration()
1103 for attr in self._access_config.attributes:
1104 access_config.set_attribute_value(attr.name, attr.value)
1105 access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1106 root_dir = "%s_%d" % (
1107 access_config.get_attribute_value(DC.ROOT_DIRECTORY),
1109 access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1110 controller = proxy.create_experiment_controller(self._experiment_xml,
1112 self._experiments[self._current] = controller
1114 started_at = time.time()
1115 # wait until all specified guids have finished execution
1116 if self._wait_guids:
1117 while all(itertools.imap(controller.is_finished, self._wait_guids):
1119 # wait until the minimum experiment duration time has elapsed
1121 while (time.time() - started_at) < self._duration:
1125 controller.shutdown()