2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
20 def _undefer(deferred):
21 if hasattr(deferred, '_get'):
22 return deferred._get()
27 class TestbedController(object):
28 def __init__(self, testbed_id, testbed_version):
29 self._testbed_id = testbed_id
30 self._testbed_version = testbed_version
34 return self._testbed_id
37 def testbed_version(self):
38 return self._testbed_version
42 raise NotImplementedError
44 def defer_configure(self, name, value):
45 """Instructs setting a configuartion attribute for the testbed instance"""
46 raise NotImplementedError
48 def defer_create(self, guid, factory_id):
49 """Instructs creation of element """
50 raise NotImplementedError
52 def defer_create_set(self, guid, name, value):
53 """Instructs setting an initial attribute on an element"""
54 raise NotImplementedError
56 def defer_factory_set(self, guid, name, value):
57 """Instructs setting an attribute on a factory"""
58 raise NotImplementedError
60 def defer_connect(self, guid1, connector_type_name1, guid2,
61 connector_type_name2):
62 """Instructs creation of a connection between the given connectors"""
63 raise NotImplementedError
65 def defer_cross_connect(self,
66 guid, connector_type_name,
67 cross_guid, cross_testbed_guid,
68 cross_testbed_id, cross_factory_id,
69 cross_connector_type_name):
71 Instructs creation of a connection between the given connectors
72 of different testbed instances
74 raise NotImplementedError
76 def defer_add_trace(self, guid, trace_id):
77 """Instructs the addition of a trace"""
78 raise NotImplementedError
80 def defer_add_address(self, guid, address, netprefix, broadcast):
81 """Instructs the addition of an address"""
82 raise NotImplementedError
84 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
85 """Instructs the addition of a route"""
86 raise NotImplementedError
89 """After do_setup the testbed initial configuration is done"""
90 raise NotImplementedError
94 After do_create all instructed elements are created and
97 raise NotImplementedError
99 def do_connect_init(self):
101 After do_connect_init all internal connections between testbed elements
104 raise NotImplementedError
106 def do_connect_compl(self):
108 After do_connect all internal connections between testbed elements
111 raise NotImplementedError
113 def do_preconfigure(self):
115 Done just before resolving netrefs, after connection, before cross connections,
116 useful for early stages of configuration, for setting up stuff that might be
117 required for netref resolution.
119 raise NotImplementedError
121 def do_configure(self):
122 """After do_configure elements are configured"""
123 raise NotImplementedError
125 def do_prestart(self):
126 """Before do_start elements are prestart-configured"""
127 raise NotImplementedError
129 def do_cross_connect_init(self, cross_data):
131 After do_cross_connect_init initiation of all external connections
132 between different testbed elements is performed
134 raise NotImplementedError
136 def do_cross_connect_compl(self, cross_data):
138 After do_cross_connect_compl completion of all external connections
139 between different testbed elements is performed
141 raise NotImplementedError
144 raise NotImplementedError
147 raise NotImplementedError
150 raise NotImplementedError
152 def set(self, guid, name, value, time = TIME_NOW):
153 raise NotImplementedError
155 def get(self, guid, name, time = TIME_NOW):
156 raise NotImplementedError
158 def get_route(self, guid, index, attribute):
162 guid: guid of box to query
163 index: number of routing entry to fetch
164 attribute: one of Destination, NextHop, NetPrefix
166 raise NotImplementedError
168 def get_address(self, guid, index, attribute='Address'):
172 guid: guid of box to query
173 index: number of inteface to select
174 attribute: one of Address, NetPrefix, Broadcast
176 raise NotImplementedError
178 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
179 raise NotImplementedError
181 def get_factory_id(self, guid):
182 raise NotImplementedError
184 def action(self, time, guid, action):
185 raise NotImplementedError
187 def status(self, guid):
188 raise NotImplementedError
190 def trace(self, guid, trace_id, attribute='value'):
191 raise NotImplementedError
193 def traces_info(self):
194 """ dictionary of dictionaries:
200 filesize = size in bytes,
204 raise NotImplementedError
207 raise NotImplementedError
209 class ExperimentController(object):
210 def __init__(self, experiment_xml, root_dir):
211 self._experiment_design_xml = experiment_xml
212 self._experiment_execute_xml = None
213 self._testbeds = dict()
214 self._deployment_config = dict()
215 self._netrefs = collections.defaultdict(set)
216 self._testbed_netrefs = collections.defaultdict(set)
217 self._cross_data = dict()
218 self._root_dir = root_dir
219 self._netreffed_testbeds = set()
220 self._guids_in_testbed_cache = dict()
222 if experiment_xml is None and root_dir is not None:
224 self.load_experiment_xml()
225 self.load_execute_xml()
227 self.persist_experiment_xml()
230 def experiment_design_xml(self):
231 return self._experiment_design_xml
234 def experiment_execute_xml(self):
235 return self._experiment_execute_xml
240 for testbed_guid in self._testbeds.keys():
241 _guids = self._guids_in_testbed(testbed_guid)
246 def persist_experiment_xml(self):
247 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
248 f = open(xml_path, "w")
249 f.write(self._experiment_design_xml)
252 def persist_execute_xml(self):
253 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
254 f = open(xml_path, "w")
255 f.write(self._experiment_execute_xml)
258 def load_experiment_xml(self):
259 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
260 f = open(xml_path, "r")
261 self._experiment_design_xml = f.read()
264 def load_execute_xml(self):
265 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
266 f = open(xml_path, "r")
267 self._experiment_execute_xml = f.read()
270 def trace(self, guid, trace_id, attribute='value'):
271 testbed = self._testbed_for_guid(guid)
273 return testbed.trace(guid, trace_id, attribute)
274 raise RuntimeError("No element exists with guid %d" % guid)
276 def traces_info(self):
278 for guid, testbed in self._testbeds.iteritems():
279 tinfo = testbed.traces_info()
281 traces_info[guid] = testbed.traces_info()
285 def _parallel(callables):
288 @functools.wraps(callable)
289 def wrapped(*p, **kw):
294 traceback.print_exc(file=sys.stderr)
295 excs.append(sys.exc_info())
297 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
298 for thread in threads:
300 for thread in threads:
303 eTyp, eVal, eLoc = exc
304 raise eTyp, eVal, eLoc
309 def _start(self, recover = False):
310 parser = XmlExperimentParser()
313 xml = self._experiment_execute_xml
315 xml = self._experiment_design_xml
316 data = parser.from_xml_to_data(xml)
318 # instantiate testbed controllers
319 to_recover, to_restart = self._init_testbed_controllers(data, recover)
320 all_restart = set(to_restart)
323 # persist testbed connection data, for potential recovery
324 self._persist_testbed_proxies()
326 # recover recoverable controllers
327 for guid in to_recover:
328 self._testbeds[guid].do_setup()
329 self._testbeds[guid].recover()
331 def steps_to_configure(self, allowed_guids):
332 # perform setup in parallel for all test beds,
333 # wait for all threads to finish
334 self._parallel([testbed.do_setup
335 for guid,testbed in self._testbeds.iteritems()
336 if guid in allowed_guids])
338 # perform create-connect in parallel, wait
339 # (internal connections only)
340 self._parallel([testbed.do_create
341 for guid,testbed in self._testbeds.iteritems()
342 if guid in allowed_guids])
344 self._parallel([testbed.do_connect_init
345 for guid,testbed in self._testbeds.iteritems()
346 if guid in allowed_guids])
348 self._parallel([testbed.do_connect_compl
349 for guid,testbed in self._testbeds.iteritems()
350 if guid in allowed_guids])
352 self._parallel([testbed.do_preconfigure
353 for guid,testbed in self._testbeds.iteritems()
354 if guid in allowed_guids])
357 steps_to_configure(self, to_restart)
359 if self._netreffed_testbeds:
360 # initally resolve netrefs
361 self.do_netrefs(data, fail_if_undefined=False)
363 # rinse and repeat, for netreffed testbeds
364 netreffed_testbeds = set(self._netreffed_testbeds)
366 to_recover, to_restart = self._init_testbed_controllers(data, recover)
367 all_restart.update(to_restart)
370 # persist testbed connection data, for potential recovery
371 self._persist_testbed_proxies()
373 # recover recoverable controllers
374 for guid in to_recover:
375 self._testbeds[guid].do_setup()
376 self._testbeds[guid].recover()
378 # configure dependant testbeds
379 steps_to_configure(self, to_restart)
381 all_restart = [ self._testbeds[guid] for guid in all_restart ]
383 # final netref step, fail if anything's left unresolved
384 self.do_netrefs(data, fail_if_undefined=True)
386 # Only now, that netref dependencies have been solve, it is safe to
387 # program cross_connections
388 self._program_testbed_cross_connections(data)
390 # perform do_configure in parallel for al testbeds
391 # (it's internal configuration for each)
392 self._parallel([testbed.do_configure
393 for testbed in all_restart])
397 #print >>sys.stderr, "DO IT"
401 # cross-connect (cannot be done in parallel)
402 for guid, testbed in self._testbeds.iteritems():
403 cross_data = self._get_cross_data(guid)
404 testbed.do_cross_connect_init(cross_data)
405 for guid, testbed in self._testbeds.iteritems():
406 cross_data = self._get_cross_data(guid)
407 testbed.do_cross_connect_compl(cross_data)
411 # Last chance to configure (parallel on all testbeds)
412 self._parallel([testbed.do_prestart
413 for testbed in all_restart])
418 # update execution xml with execution-specific values
419 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
420 self._update_execute_xml()
421 self.persist_execute_xml()
423 # start experiment (parallel start on all testbeds)
424 self._parallel([testbed.start
425 for testbed in all_restart])
429 def _clear_caches(self):
430 # Cleaning cache for safety.
431 self._guids_in_testbed_cache = dict()
433 def _persist_testbed_proxies(self):
434 TRANSIENT = (DC.RECOVER,)
436 # persist access configuration for all testbeds, so that
437 # recovery mode can reconnect to them if it becomes necessary
438 conf = ConfigParser.RawConfigParser()
439 for testbed_guid, testbed_config in self._deployment_config.iteritems():
440 testbed_guid = str(testbed_guid)
441 conf.add_section(testbed_guid)
442 for attr in testbed_config.get_attribute_list():
443 if attr not in TRANSIENT:
444 conf.set(testbed_guid, attr,
445 testbed_config.get_attribute_value(attr))
447 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
451 def _load_testbed_proxies(self):
453 Attribute.STRING : 'get',
454 Attribute.BOOL : 'getboolean',
455 Attribute.ENUM : 'get',
456 Attribute.DOUBLE : 'getfloat',
457 Attribute.INTEGER : 'getint',
460 TRANSIENT = (DC.RECOVER,)
462 # deferred import because proxy needs
463 # our class definitions to define proxies
464 import nepi.util.proxy as proxy
466 conf = ConfigParser.RawConfigParser()
467 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
468 for testbed_guid in conf.sections():
469 testbed_config = proxy.AccessConfiguration()
470 testbed_guid = str(testbed_guid)
471 for attr in testbed_config.get_attribute_list():
472 if attr not in TRANSIENT:
473 getter = getattr(conf, TYPEMAP.get(
474 testbed_config.get_attribute_type(attr),
476 testbed_config.set_attribute_value(
477 attr, getter(testbed_guid, attr))
479 def _unpersist_testbed_proxies(self):
481 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
483 # Just print exceptions, this is just cleanup
485 ######## BUG ##########
486 #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
487 #traceback.print_exc(file=sys.stderr)
489 def _update_execute_xml(self):
491 # For all elements in testbed,
492 # - gather immutable execute-readable attribuets lists
494 # Generate new design description from design xml
495 # (Wait for attributes lists - implicit syncpoint)
497 # For all elements in testbed,
498 # - gather all immutable execute-readable attribute
499 # values, asynchronously
500 # (Wait for attribute values - implicit syncpoint)
502 # For all elements in testbed,
503 # - inject non-None values into new design
504 # Generate execute xml from new design
506 attribute_lists = dict(
507 (testbed_guid, collections.defaultdict(dict))
508 for testbed_guid in self._testbeds
511 for testbed_guid, testbed in self._testbeds.iteritems():
512 guids = self._guids_in_testbed(testbed_guid)
514 attribute_lists[testbed_guid][guid] = \
515 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
517 parser = XmlExperimentParser()
518 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
520 attribute_values = dict(
521 (testbed_guid, collections.defaultdict(dict))
522 for testbed_guid in self._testbeds
525 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
526 testbed = self._testbeds[testbed_guid]
527 for guid, attribute_list in testbed_attribute_lists.iteritems():
528 attribute_list = _undefer(attribute_list)
529 attribute_values[testbed_guid][guid] = dict(
530 (attribute, testbed.get_deferred(guid, attribute))
531 for attribute in attribute_list
534 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
535 for guid, attribute_values in testbed_attribute_values.iteritems():
536 for attribute, value in attribute_values.iteritems():
537 value = _undefer(value)
538 if value is not None:
539 execute_data.add_attribute_data(guid, attribute, value)
541 self._experiment_execute_xml = parser.to_xml(data=execute_data)
544 for testbed in self._testbeds.values():
546 self._unpersist_testbed_proxies()
549 # reload perviously persisted testbed access configurations
550 self._load_testbed_proxies()
552 self._start(recover = True)
554 def is_finished(self, guid):
555 testbed = self._testbed_for_guid(guid)
557 return testbed.status(guid) == AS.STATUS_FINISHED
558 raise RuntimeError("No element exists with guid %d" % guid)
560 def status(self, guid):
561 testbed = self._testbed_for_guid(guid)
563 return testbed.status(guid)
564 raise RuntimeError("No element exists with guid %d" % guid)
566 def set(self, guid, name, value, time = TIME_NOW):
567 testbed = self._testbed_for_guid(guid)
569 testbed.set(guid, name, value, time)
571 raise RuntimeError("No element exists with guid %d" % guid)
573 def get(self, guid, name, time = TIME_NOW):
574 testbed = self._testbed_for_guid(guid)
576 return testbed.get(guid, name, time)
577 raise RuntimeError("No element exists with guid %d" % guid)
579 def get_deferred(self, guid, name, time = TIME_NOW):
580 testbed = self._testbed_for_guid(guid)
582 return testbed.get_deferred(guid, name, time)
583 raise RuntimeError("No element exists with guid %d" % guid)
585 def get_factory_id(self, guid):
586 testbed = self._testbed_for_guid(guid)
588 return testbed.get_factory_id(guid)
589 raise RuntimeError("No element exists with guid %d" % guid)
591 def get_testbed_id(self, guid):
592 testbed = self._testbed_for_guid(guid)
594 return testbed.testbed_id
595 raise RuntimeError("No element exists with guid %d" % guid)
597 def get_testbed_version(self, guid):
598 testbed = self._testbed_for_guid(guid)
600 return testbed.testbed_version
601 raise RuntimeError("No element exists with guid %d" % guid)
605 for testbed in self._testbeds.values():
609 exceptions.append(sys.exc_info())
610 for exc_info in exceptions:
611 raise exc_info[0], exc_info[1], exc_info[2]
613 def _testbed_for_guid(self, guid):
614 for testbed_guid in self._testbeds.keys():
615 if guid in self._guids_in_testbed(testbed_guid):
616 return self._testbeds[testbed_guid]
619 def _guids_in_testbed(self, testbed_guid):
620 if testbed_guid not in self._testbeds:
622 if testbed_guid not in self._guids_in_testbed_cache:
623 self._guids_in_testbed_cache[testbed_guid] = \
624 set(self._testbeds[testbed_guid].guids)
625 return self._guids_in_testbed_cache[testbed_guid]
628 def _netref_component_split(component):
629 match = COMPONENT_PATTERN.match(component)
631 return match.group("kind"), match.group("index")
633 return component, None
635 _NETREF_COMPONENT_GETTERS = {
637 lambda testbed, guid, index, name:
638 testbed.get_address(guid, int(index), name),
640 lambda testbed, guid, index, name:
641 testbed.get_route(guid, int(index), name),
643 lambda testbed, guid, index, name:
644 testbed.trace(guid, index, name),
646 lambda testbed, guid, index, name:
647 testbed.get(guid, name),
650 def resolve_netref_value(self, value, failval = None):
651 match = ATTRIBUTE_PATTERN_BASE.search(value)
653 label = match.group("label")
654 if label.startswith('GUID-'):
655 ref_guid = int(label[5:])
657 expr = match.group("expr")
658 component = (match.group("component") or "")[1:] # skip the dot
659 attribute = match.group("attribute")
661 # split compound components into component kind and index
662 # eg: 'addr[0]' -> ('addr', '0')
663 component, component_index = self._netref_component_split(component)
665 # find object and resolve expression
666 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
667 if component not in self._NETREF_COMPONENT_GETTERS:
668 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
669 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
672 ref_value = self._NETREF_COMPONENT_GETTERS[component](
673 ref_testbed, ref_guid, component_index, attribute)
675 return value.replace(match.group(), ref_value)
676 # couldn't find value
679 def do_netrefs(self, data, fail_if_undefined = False):
681 for (testbed_guid, guid), attrs in self._netrefs.items():
682 testbed = self._testbeds.get(testbed_guid)
683 if testbed is not None:
684 for name in set(attrs):
685 value = testbed.get(guid, name)
686 if isinstance(value, basestring):
687 ref_value = self.resolve_netref_value(value)
688 if ref_value is not None:
689 testbed.set(guid, name, ref_value)
691 elif fail_if_undefined:
692 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
694 del self._netrefs[(testbed_guid, guid)]
697 for testbed_guid, attrs in self._testbed_netrefs.items():
698 tb_data = dict(data.get_attribute_data(testbed_guid))
700 for name in set(attrs):
701 value = tb_data.get(name)
702 if isinstance(value, basestring):
703 ref_value = self.resolve_netref_value(value)
704 if ref_value is not None:
705 data.set_attribute_data(testbed_guid, name, ref_value)
707 elif fail_if_undefined:
708 raise ValueError, "Unresolvable netref in: %r" % (value,)
710 del self._testbed_netrefs[testbed_guid]
713 def _init_testbed_controllers(self, data, recover = False):
714 blacklist_testbeds = set(self._testbeds)
715 element_guids = list()
717 data_guids = data.guids
721 # gather label associations
722 for guid in data_guids:
723 if not data.is_testbed_data(guid):
724 (testbed_guid, factory_id) = data.get_box_data(guid)
725 label = data.get_attribute_data(guid, "label")
726 if label is not None:
727 if label in label_guids:
728 raise RuntimeError, "Label %r is not unique" % (label,)
729 label_guids[label] = guid
731 # create testbed controllers
732 for guid in data_guids:
733 if data.is_testbed_data(guid):
734 if guid not in self._testbeds:
736 self._create_testbed_controller(
737 guid, data, element_guids, recover)
740 blacklist_testbeds.add(guid)
745 policy = data.get_attribute_data(guid, DC.RECOVERY_POLICY)
746 if policy == DC.POLICY_FAIL:
748 elif policy == DC.POLICY_RECOVER:
749 self._create_testbed_controller(
750 guid, data, element_guids, False)
752 elif policy == DC.POLICY_RESTART:
753 self._create_testbed_controller(
754 guid, data, element_guids, False)
761 # queue programmable elements
762 # - that have not been programmed already (blacklist_testbeds)
763 # - including recovered or restarted testbeds
764 # - but those that have no unresolved netrefs
765 for guid in data_guids:
766 if not data.is_testbed_data(guid):
767 (testbed_guid, factory_id) = data.get_box_data(guid)
768 if testbed_guid not in blacklist_testbeds:
769 element_guids.append(guid)
771 # replace references to elements labels for its guid
772 self._resolve_labels(data, data_guids, label_guids)
774 # program testbed controllers
776 self._program_testbed_controllers(element_guids, data)
778 return to_recover, to_restart
780 def _resolve_labels(self, data, data_guids, label_guids):
781 netrefs = self._netrefs
782 testbed_netrefs = self._testbed_netrefs
783 for guid in data_guids:
784 for name, value in data.get_attribute_data(guid):
785 if isinstance(value, basestring):
786 match = ATTRIBUTE_PATTERN_BASE.search(value)
788 label = match.group("label")
789 if not label.startswith('GUID-'):
790 ref_guid = label_guids.get(label)
791 if ref_guid is not None:
792 value = ATTRIBUTE_PATTERN_BASE.sub(
793 ATTRIBUTE_PATTERN_GUID_SUB % dict(
794 guid = 'GUID-%d' % (ref_guid,),
795 expr = match.group("expr"),
798 data.set_attribute_data(guid, name, value)
800 # memorize which guid-attribute pairs require
801 # postprocessing, to avoid excessive controller-testbed
802 # communication at configuration time
803 # (which could require high-latency network I/O)
804 if not data.is_testbed_data(guid):
805 (testbed_guid, factory_id) = data.get_box_data(guid)
806 netrefs[(testbed_guid, guid)].add(name)
808 testbed_netrefs[guid].add(name)
810 def _create_testbed_controller(self, guid, data, element_guids, recover):
811 (testbed_id, testbed_version) = data.get_testbed_data(guid)
812 deployment_config = self._deployment_config.get(guid)
814 # deferred import because proxy needs
815 # our class definitions to define proxies
816 import nepi.util.proxy as proxy
818 if deployment_config is None:
820 deployment_config = proxy.AccessConfiguration()
822 for (name, value) in data.get_attribute_data(guid):
823 if value is not None and deployment_config.has_attribute(name):
824 # if any deployment config attribute has a netref, we can't
825 # create this controller yet
826 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
827 # remember to re-issue this one
828 self._netreffed_testbeds.add(guid)
831 # copy deployment config attribute
832 deployment_config.set_attribute_value(name, value)
835 self._deployment_config[guid] = deployment_config
837 if deployment_config is not None:
838 # force recovery mode
839 deployment_config.set_attribute_value("recover",recover)
841 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
843 for (name, value) in data.get_attribute_data(guid):
844 testbed.defer_configure(name, value)
845 self._testbeds[guid] = testbed
846 if guid in self._netreffed_testbeds:
847 self._netreffed_testbeds.remove(guid)
849 def _program_testbed_controllers(self, element_guids, data):
850 def resolve_create_netref(data, guid, name, value):
851 # Try to resolve create-time netrefs, if possible
852 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
854 nuvalue = self.resolve_netref_value(value)
856 # Any trouble means we're not in shape to resolve the netref yet
858 if nuvalue is not None:
859 # Only if we succeed we remove the netref deferral entry
861 data.set_attribute_data(guid, name, value)
862 if (testbed_guid, guid) in self._netrefs:
863 self._netrefs[(testbed_guid, guid)].discard(name)
866 for guid in element_guids:
867 (testbed_guid, factory_id) = data.get_box_data(guid)
868 testbed = self._testbeds.get(testbed_guid)
869 if testbed is not None:
871 testbed.defer_create(guid, factory_id)
873 for (name, value) in data.get_attribute_data(guid):
874 value = resolve_create_netref(data, guid, name, value)
875 testbed.defer_create_set(guid, name, value)
877 for guid in element_guids:
878 (testbed_guid, factory_id) = data.get_box_data(guid)
879 testbed = self._testbeds.get(testbed_guid)
880 if testbed is not None:
882 for trace_id in data.get_trace_data(guid):
883 testbed.defer_add_trace(guid, trace_id)
885 for (address, netprefix, broadcast) in data.get_address_data(guid):
887 testbed.defer_add_address(guid, address, netprefix,
890 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
891 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
892 # store connections data
893 for (connector_type_name, other_guid, other_connector_type_name) \
894 in data.get_connection_data(guid):
895 (other_testbed_guid, other_factory_id) = data.get_box_data(
897 if testbed_guid == other_testbed_guid:
898 # each testbed should take care of enforcing internal
899 # connection simmetry, so each connection is only
900 # added in one direction
901 testbed.defer_connect(guid, connector_type_name,
902 other_guid, other_connector_type_name)
904 def _program_testbed_cross_connections(self, data):
905 data_guids = data.guids
906 for guid in data_guids:
907 if not data.is_testbed_data(guid):
908 (testbed_guid, factory_id) = data.get_box_data(guid)
909 testbed = self._testbeds.get(testbed_guid)
910 if testbed is not None:
911 for (connector_type_name, cross_guid, cross_connector_type_name) \
912 in data.get_connection_data(guid):
913 (testbed_guid, factory_id) = data.get_box_data(guid)
914 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
916 if testbed_guid != cross_testbed_guid:
917 cross_testbed = self._testbeds[cross_testbed_guid]
918 cross_testbed_id = cross_testbed.testbed_id
919 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
920 cross_testbed_guid, cross_testbed_id, cross_factory_id,
921 cross_connector_type_name)
922 # save cross data for later
923 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
926 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
927 if testbed_guid not in self._cross_data:
928 self._cross_data[testbed_guid] = dict()
929 if cross_testbed_guid not in self._cross_data[testbed_guid]:
930 self._cross_data[testbed_guid][cross_testbed_guid] = set()
931 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
933 def _get_cross_data(self, testbed_guid):
935 if not testbed_guid in self._cross_data:
938 # fetch attribute lists in one batch
939 attribute_lists = dict()
940 for cross_testbed_guid, guid_list in \
941 self._cross_data[testbed_guid].iteritems():
942 cross_testbed = self._testbeds[cross_testbed_guid]
943 for cross_guid in guid_list:
944 attribute_lists[(cross_testbed_guid, cross_guid)] = \
945 cross_testbed.get_attribute_list_deferred(cross_guid)
947 # fetch attribute values in another batch
948 for cross_testbed_guid, guid_list in \
949 self._cross_data[testbed_guid].iteritems():
950 cross_data[cross_testbed_guid] = dict()
951 cross_testbed = self._testbeds[cross_testbed_guid]
952 for cross_guid in guid_list:
953 elem_cross_data = dict(
955 _testbed_guid = cross_testbed_guid,
956 _testbed_id = cross_testbed.testbed_id,
957 _testbed_version = cross_testbed.testbed_version)
958 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
959 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
960 for attr_name in attribute_list:
961 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
962 elem_cross_data[attr_name] = attr_value
964 # undefer all values - we'll have to serialize them probably later
965 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
966 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
967 for attr_name, attr_value in elem_cross_data.iteritems():
968 elem_cross_data[attr_name] = _undefer(attr_value)