2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
20 def _undefer(deferred):
21 if hasattr(deferred, '_get'):
22 return deferred._get()
27 class TestbedController(object):
28 def __init__(self, testbed_id, testbed_version):
29 self._testbed_id = testbed_id
30 self._testbed_version = testbed_version
34 return self._testbed_id
37 def testbed_version(self):
38 return self._testbed_version
42 raise NotImplementedError
44 def defer_configure(self, name, value):
45 """Instructs setting a configuartion attribute for the testbed instance"""
46 raise NotImplementedError
48 def defer_create(self, guid, factory_id):
49 """Instructs creation of element """
50 raise NotImplementedError
52 def defer_create_set(self, guid, name, value):
53 """Instructs setting an initial attribute on an element"""
54 raise NotImplementedError
56 def defer_factory_set(self, guid, name, value):
57 """Instructs setting an attribute on a factory"""
58 raise NotImplementedError
60 def defer_connect(self, guid1, connector_type_name1, guid2,
61 connector_type_name2):
62 """Instructs creation of a connection between the given connectors"""
63 raise NotImplementedError
65 def defer_cross_connect(self,
66 guid, connector_type_name,
67 cross_guid, cross_testbed_guid,
68 cross_testbed_id, cross_factory_id,
69 cross_connector_type_name):
71 Instructs creation of a connection between the given connectors
72 of different testbed instances
74 raise NotImplementedError
76 def defer_add_trace(self, guid, trace_id):
77 """Instructs the addition of a trace"""
78 raise NotImplementedError
80 def defer_add_address(self, guid, address, netprefix, broadcast):
81 """Instructs the addition of an address"""
82 raise NotImplementedError
84 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
85 """Instructs the addition of a route"""
86 raise NotImplementedError
89 """After do_setup the testbed initial configuration is done"""
90 raise NotImplementedError
94 After do_create all instructed elements are created and
97 raise NotImplementedError
99 def do_connect_init(self):
101 After do_connect_init all internal connections between testbed elements
104 raise NotImplementedError
106 def do_connect_compl(self):
108 After do_connect all internal connections between testbed elements
111 raise NotImplementedError
113 def do_preconfigure(self):
115 Done just before resolving netrefs, after connection, before cross connections,
116 useful for early stages of configuration, for setting up stuff that might be
117 required for netref resolution.
119 raise NotImplementedError
121 def do_configure(self):
122 """After do_configure elements are configured"""
123 raise NotImplementedError
125 def do_prestart(self):
126 """Before do_start elements are prestart-configured"""
127 raise NotImplementedError
129 def do_cross_connect_init(self, cross_data):
131 After do_cross_connect_init initiation of all external connections
132 between different testbed elements is performed
134 raise NotImplementedError
136 def do_cross_connect_compl(self, cross_data):
138 After do_cross_connect_compl completion of all external connections
139 between different testbed elements is performed
141 raise NotImplementedError
144 raise NotImplementedError
147 raise NotImplementedError
151 On testbed recovery (if recovery is a supported policy), the controller
152 instance will be re-created and the following sequence invoked:
155 defer_X - programming the testbed with persisted execution values
156 (not design values). Execution values (ExecImmutable attributes)
157 should be enough to recreate the testbed's state.
159 <cross-connection methods>
161 Start will not be called, and after cross connection invocations,
162 the testbed is supposed to be fully functional again.
164 raise NotImplementedError
166 def set(self, guid, name, value, time = TIME_NOW):
167 raise NotImplementedError
169 def get(self, guid, name, time = TIME_NOW):
170 raise NotImplementedError
172 def get_route(self, guid, index, attribute):
176 guid: guid of box to query
177 index: number of routing entry to fetch
178 attribute: one of Destination, NextHop, NetPrefix
180 raise NotImplementedError
182 def get_address(self, guid, index, attribute='Address'):
186 guid: guid of box to query
187 index: number of inteface to select
188 attribute: one of Address, NetPrefix, Broadcast
190 raise NotImplementedError
192 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
193 raise NotImplementedError
195 def get_factory_id(self, guid):
196 raise NotImplementedError
198 def action(self, time, guid, action):
199 raise NotImplementedError
201 def status(self, guid):
202 raise NotImplementedError
204 def trace(self, guid, trace_id, attribute='value'):
205 raise NotImplementedError
207 def traces_info(self):
208 """ dictionary of dictionaries:
214 filesize = size in bytes,
218 raise NotImplementedError
221 raise NotImplementedError
223 class ExperimentController(object):
224 def __init__(self, experiment_xml, root_dir):
225 self._experiment_design_xml = experiment_xml
226 self._experiment_execute_xml = None
227 self._testbeds = dict()
228 self._deployment_config = dict()
229 self._netrefs = collections.defaultdict(set)
230 self._testbed_netrefs = collections.defaultdict(set)
231 self._cross_data = dict()
232 self._root_dir = root_dir
233 self._netreffed_testbeds = set()
234 self._guids_in_testbed_cache = dict()
236 if experiment_xml is None and root_dir is not None:
238 self.load_experiment_xml()
239 self.load_execute_xml()
241 self.persist_experiment_xml()
244 def experiment_design_xml(self):
245 return self._experiment_design_xml
248 def experiment_execute_xml(self):
249 return self._experiment_execute_xml
254 for testbed_guid in self._testbeds.keys():
255 _guids = self._guids_in_testbed(testbed_guid)
260 def persist_experiment_xml(self):
261 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
262 f = open(xml_path, "w")
263 f.write(self._experiment_design_xml)
266 def persist_execute_xml(self):
267 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
268 f = open(xml_path, "w")
269 f.write(self._experiment_execute_xml)
272 def load_experiment_xml(self):
273 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
274 f = open(xml_path, "r")
275 self._experiment_design_xml = f.read()
278 def load_execute_xml(self):
279 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
280 f = open(xml_path, "r")
281 self._experiment_execute_xml = f.read()
284 def trace(self, guid, trace_id, attribute='value'):
285 testbed = self._testbed_for_guid(guid)
287 return testbed.trace(guid, trace_id, attribute)
288 raise RuntimeError("No element exists with guid %d" % guid)
290 def traces_info(self):
292 for guid, testbed in self._testbeds.iteritems():
293 tinfo = testbed.traces_info()
295 traces_info[guid] = testbed.traces_info()
299 def _parallel(callables):
302 @functools.wraps(callable)
303 def wrapped(*p, **kw):
308 traceback.print_exc(file=sys.stderr)
309 excs.append(sys.exc_info())
311 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
312 for thread in threads:
314 for thread in threads:
317 eTyp, eVal, eLoc = exc
318 raise eTyp, eVal, eLoc
323 def _start(self, recover = False):
324 parser = XmlExperimentParser()
327 xml = self._experiment_execute_xml
329 xml = self._experiment_design_xml
330 data = parser.from_xml_to_data(xml)
332 # instantiate testbed controllers
333 to_recover, to_restart = self._init_testbed_controllers(data, recover)
334 all_restart = set(to_restart)
337 # persist testbed connection data, for potential recovery
338 self._persist_testbed_proxies()
340 # recover recoverable controllers
341 for guid in to_recover:
342 self._testbeds[guid].do_setup()
343 self._testbeds[guid].recover()
345 def steps_to_configure(self, allowed_guids):
346 # perform setup in parallel for all test beds,
347 # wait for all threads to finish
348 self._parallel([testbed.do_setup
349 for guid,testbed in self._testbeds.iteritems()
350 if guid in allowed_guids])
352 # perform create-connect in parallel, wait
353 # (internal connections only)
354 self._parallel([testbed.do_create
355 for guid,testbed in self._testbeds.iteritems()
356 if guid in allowed_guids])
358 self._parallel([testbed.do_connect_init
359 for guid,testbed in self._testbeds.iteritems()
360 if guid in allowed_guids])
362 self._parallel([testbed.do_connect_compl
363 for guid,testbed in self._testbeds.iteritems()
364 if guid in allowed_guids])
366 self._parallel([testbed.do_preconfigure
367 for guid,testbed in self._testbeds.iteritems()
368 if guid in allowed_guids])
371 steps_to_configure(self, to_restart)
373 if self._netreffed_testbeds:
374 # initally resolve netrefs
375 self.do_netrefs(data, fail_if_undefined=False)
377 # rinse and repeat, for netreffed testbeds
378 netreffed_testbeds = set(self._netreffed_testbeds)
380 to_recover, to_restart = self._init_testbed_controllers(data, recover)
381 all_restart.update(to_restart)
384 # persist testbed connection data, for potential recovery
385 self._persist_testbed_proxies()
387 # recover recoverable controllers
388 for guid in to_recover:
389 self._testbeds[guid].do_setup()
390 self._testbeds[guid].recover()
392 # configure dependant testbeds
393 steps_to_configure(self, to_restart)
395 all_restart = [ self._testbeds[guid] for guid in all_restart ]
397 # final netref step, fail if anything's left unresolved
398 self.do_netrefs(data, fail_if_undefined=True)
400 # Only now, that netref dependencies have been solve, it is safe to
401 # program cross_connections
402 self._program_testbed_cross_connections(data)
404 # perform do_configure in parallel for al testbeds
405 # (it's internal configuration for each)
406 self._parallel([testbed.do_configure
407 for testbed in all_restart])
411 #print >>sys.stderr, "DO IT"
415 # cross-connect (cannot be done in parallel)
416 for guid, testbed in self._testbeds.iteritems():
417 cross_data = self._get_cross_data(guid)
418 testbed.do_cross_connect_init(cross_data)
419 for guid, testbed in self._testbeds.iteritems():
420 cross_data = self._get_cross_data(guid)
421 testbed.do_cross_connect_compl(cross_data)
425 # Last chance to configure (parallel on all testbeds)
426 self._parallel([testbed.do_prestart
427 for testbed in all_restart])
432 # update execution xml with execution-specific values
433 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
434 self._update_execute_xml()
435 self.persist_execute_xml()
437 # start experiment (parallel start on all testbeds)
438 self._parallel([testbed.start
439 for testbed in all_restart])
443 def _clear_caches(self):
444 # Cleaning cache for safety.
445 self._guids_in_testbed_cache = dict()
447 def _persist_testbed_proxies(self):
448 TRANSIENT = (DC.RECOVER,)
450 # persist access configuration for all testbeds, so that
451 # recovery mode can reconnect to them if it becomes necessary
452 conf = ConfigParser.RawConfigParser()
453 for testbed_guid, testbed_config in self._deployment_config.iteritems():
454 testbed_guid = str(testbed_guid)
455 conf.add_section(testbed_guid)
456 for attr in testbed_config.get_attribute_list():
457 if attr not in TRANSIENT:
458 conf.set(testbed_guid, attr,
459 testbed_config.get_attribute_value(attr))
461 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
465 def _load_testbed_proxies(self):
467 Attribute.STRING : 'get',
468 Attribute.BOOL : 'getboolean',
469 Attribute.ENUM : 'get',
470 Attribute.DOUBLE : 'getfloat',
471 Attribute.INTEGER : 'getint',
474 TRANSIENT = (DC.RECOVER,)
476 # deferred import because proxy needs
477 # our class definitions to define proxies
478 import nepi.util.proxy as proxy
480 conf = ConfigParser.RawConfigParser()
481 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
482 for testbed_guid in conf.sections():
483 testbed_config = proxy.AccessConfiguration()
484 testbed_guid = str(testbed_guid)
485 for attr in testbed_config.get_attribute_list():
486 if attr not in TRANSIENT:
487 getter = getattr(conf, TYPEMAP.get(
488 testbed_config.get_attribute_type(attr),
490 testbed_config.set_attribute_value(
491 attr, getter(testbed_guid, attr))
493 def _unpersist_testbed_proxies(self):
495 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
497 # Just print exceptions, this is just cleanup
499 ######## BUG ##########
500 #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
501 #traceback.print_exc(file=sys.stderr)
503 def _update_execute_xml(self):
505 # For all elements in testbed,
506 # - gather immutable execute-readable attribuets lists
508 # Generate new design description from design xml
509 # (Wait for attributes lists - implicit syncpoint)
511 # For all elements in testbed,
512 # - gather all immutable execute-readable attribute
513 # values, asynchronously
514 # (Wait for attribute values - implicit syncpoint)
516 # For all elements in testbed,
517 # - inject non-None values into new design
518 # Generate execute xml from new design
520 attribute_lists = dict(
521 (testbed_guid, collections.defaultdict(dict))
522 for testbed_guid in self._testbeds
525 for testbed_guid, testbed in self._testbeds.iteritems():
526 guids = self._guids_in_testbed(testbed_guid)
528 attribute_lists[testbed_guid][guid] = \
529 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
531 parser = XmlExperimentParser()
532 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
534 attribute_values = dict(
535 (testbed_guid, collections.defaultdict(dict))
536 for testbed_guid in self._testbeds
539 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
540 testbed = self._testbeds[testbed_guid]
541 for guid, attribute_list in testbed_attribute_lists.iteritems():
542 attribute_list = _undefer(attribute_list)
543 attribute_values[testbed_guid][guid] = dict(
544 (attribute, testbed.get_deferred(guid, attribute))
545 for attribute in attribute_list
548 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
549 for guid, attribute_values in testbed_attribute_values.iteritems():
550 for attribute, value in attribute_values.iteritems():
551 value = _undefer(value)
552 if value is not None:
553 execute_data.add_attribute_data(guid, attribute, value)
555 self._experiment_execute_xml = parser.to_xml(data=execute_data)
558 for testbed in self._testbeds.values():
560 self._unpersist_testbed_proxies()
563 # reload perviously persisted testbed access configurations
564 self._load_testbed_proxies()
566 self._start(recover = True)
568 def is_finished(self, guid):
569 testbed = self._testbed_for_guid(guid)
571 return testbed.status(guid) == AS.STATUS_FINISHED
572 raise RuntimeError("No element exists with guid %d" % guid)
574 def status(self, guid):
575 testbed = self._testbed_for_guid(guid)
577 return testbed.status(guid)
578 raise RuntimeError("No element exists with guid %d" % guid)
580 def set(self, guid, name, value, time = TIME_NOW):
581 testbed = self._testbed_for_guid(guid)
583 testbed.set(guid, name, value, time)
585 raise RuntimeError("No element exists with guid %d" % guid)
587 def get(self, guid, name, time = TIME_NOW):
588 testbed = self._testbed_for_guid(guid)
590 return testbed.get(guid, name, time)
591 raise RuntimeError("No element exists with guid %d" % guid)
593 def get_deferred(self, guid, name, time = TIME_NOW):
594 testbed = self._testbed_for_guid(guid)
596 return testbed.get_deferred(guid, name, time)
597 raise RuntimeError("No element exists with guid %d" % guid)
599 def get_factory_id(self, guid):
600 testbed = self._testbed_for_guid(guid)
602 return testbed.get_factory_id(guid)
603 raise RuntimeError("No element exists with guid %d" % guid)
605 def get_testbed_id(self, guid):
606 testbed = self._testbed_for_guid(guid)
608 return testbed.testbed_id
609 raise RuntimeError("No element exists with guid %d" % guid)
611 def get_testbed_version(self, guid):
612 testbed = self._testbed_for_guid(guid)
614 return testbed.testbed_version
615 raise RuntimeError("No element exists with guid %d" % guid)
619 for testbed in self._testbeds.values():
623 exceptions.append(sys.exc_info())
624 for exc_info in exceptions:
625 raise exc_info[0], exc_info[1], exc_info[2]
627 def _testbed_for_guid(self, guid):
628 for testbed_guid in self._testbeds.keys():
629 if guid in self._guids_in_testbed(testbed_guid):
630 return self._testbeds[testbed_guid]
633 def _guids_in_testbed(self, testbed_guid):
634 if testbed_guid not in self._testbeds:
636 if testbed_guid not in self._guids_in_testbed_cache:
637 self._guids_in_testbed_cache[testbed_guid] = \
638 set(self._testbeds[testbed_guid].guids)
639 return self._guids_in_testbed_cache[testbed_guid]
642 def _netref_component_split(component):
643 match = COMPONENT_PATTERN.match(component)
645 return match.group("kind"), match.group("index")
647 return component, None
649 _NETREF_COMPONENT_GETTERS = {
651 lambda testbed, guid, index, name:
652 testbed.get_address(guid, int(index), name),
654 lambda testbed, guid, index, name:
655 testbed.get_route(guid, int(index), name),
657 lambda testbed, guid, index, name:
658 testbed.trace(guid, index, name),
660 lambda testbed, guid, index, name:
661 testbed.get(guid, name),
664 def resolve_netref_value(self, value, failval = None):
665 match = ATTRIBUTE_PATTERN_BASE.search(value)
667 label = match.group("label")
668 if label.startswith('GUID-'):
669 ref_guid = int(label[5:])
671 expr = match.group("expr")
672 component = (match.group("component") or "")[1:] # skip the dot
673 attribute = match.group("attribute")
675 # split compound components into component kind and index
676 # eg: 'addr[0]' -> ('addr', '0')
677 component, component_index = self._netref_component_split(component)
679 # find object and resolve expression
680 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
681 if component not in self._NETREF_COMPONENT_GETTERS:
682 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
683 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
686 ref_value = self._NETREF_COMPONENT_GETTERS[component](
687 ref_testbed, ref_guid, component_index, attribute)
689 return value.replace(match.group(), ref_value)
690 # couldn't find value
693 def do_netrefs(self, data, fail_if_undefined = False):
695 for (testbed_guid, guid), attrs in self._netrefs.items():
696 testbed = self._testbeds.get(testbed_guid)
697 if testbed is not None:
698 for name in set(attrs):
699 value = testbed.get(guid, name)
700 if isinstance(value, basestring):
701 ref_value = self.resolve_netref_value(value)
702 if ref_value is not None:
703 testbed.set(guid, name, ref_value)
705 elif fail_if_undefined:
706 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
708 del self._netrefs[(testbed_guid, guid)]
711 for testbed_guid, attrs in self._testbed_netrefs.items():
712 tb_data = dict(data.get_attribute_data(testbed_guid))
714 for name in set(attrs):
715 value = tb_data.get(name)
716 if isinstance(value, basestring):
717 ref_value = self.resolve_netref_value(value)
718 if ref_value is not None:
719 data.set_attribute_data(testbed_guid, name, ref_value)
721 elif fail_if_undefined:
722 raise ValueError, "Unresolvable netref in: %r" % (value,)
724 del self._testbed_netrefs[testbed_guid]
727 def _init_testbed_controllers(self, data, recover = False):
728 blacklist_testbeds = set(self._testbeds)
729 element_guids = list()
731 data_guids = data.guids
735 # gather label associations
736 for guid in data_guids:
737 if not data.is_testbed_data(guid):
738 (testbed_guid, factory_id) = data.get_box_data(guid)
739 label = data.get_attribute_data(guid, "label")
740 if label is not None:
741 if label in label_guids:
742 raise RuntimeError, "Label %r is not unique" % (label,)
743 label_guids[label] = guid
745 # create testbed controllers
746 for guid in data_guids:
747 if data.is_testbed_data(guid):
748 if guid not in self._testbeds:
750 self._create_testbed_controller(
751 guid, data, element_guids, recover)
754 blacklist_testbeds.add(guid)
759 policy = data.get_attribute_data(guid, DC.RECOVERY_POLICY)
760 if policy == DC.POLICY_FAIL:
762 elif policy == DC.POLICY_RECOVER:
763 self._create_testbed_controller(
764 guid, data, element_guids, False)
766 elif policy == DC.POLICY_RESTART:
767 self._create_testbed_controller(
768 guid, data, element_guids, False)
775 # queue programmable elements
776 # - that have not been programmed already (blacklist_testbeds)
777 # - including recovered or restarted testbeds
778 # - but those that have no unresolved netrefs
779 for guid in data_guids:
780 if not data.is_testbed_data(guid):
781 (testbed_guid, factory_id) = data.get_box_data(guid)
782 if testbed_guid not in blacklist_testbeds:
783 element_guids.append(guid)
785 # replace references to elements labels for its guid
786 self._resolve_labels(data, data_guids, label_guids)
788 # program testbed controllers
790 self._program_testbed_controllers(element_guids, data)
792 return to_recover, to_restart
794 def _resolve_labels(self, data, data_guids, label_guids):
795 netrefs = self._netrefs
796 testbed_netrefs = self._testbed_netrefs
797 for guid in data_guids:
798 for name, value in data.get_attribute_data(guid):
799 if isinstance(value, basestring):
800 match = ATTRIBUTE_PATTERN_BASE.search(value)
802 label = match.group("label")
803 if not label.startswith('GUID-'):
804 ref_guid = label_guids.get(label)
805 if ref_guid is not None:
806 value = ATTRIBUTE_PATTERN_BASE.sub(
807 ATTRIBUTE_PATTERN_GUID_SUB % dict(
808 guid = 'GUID-%d' % (ref_guid,),
809 expr = match.group("expr"),
812 data.set_attribute_data(guid, name, value)
814 # memorize which guid-attribute pairs require
815 # postprocessing, to avoid excessive controller-testbed
816 # communication at configuration time
817 # (which could require high-latency network I/O)
818 if not data.is_testbed_data(guid):
819 (testbed_guid, factory_id) = data.get_box_data(guid)
820 netrefs[(testbed_guid, guid)].add(name)
822 testbed_netrefs[guid].add(name)
824 def _create_testbed_controller(self, guid, data, element_guids, recover):
825 (testbed_id, testbed_version) = data.get_testbed_data(guid)
826 deployment_config = self._deployment_config.get(guid)
828 # deferred import because proxy needs
829 # our class definitions to define proxies
830 import nepi.util.proxy as proxy
832 if deployment_config is None:
834 deployment_config = proxy.AccessConfiguration()
836 for (name, value) in data.get_attribute_data(guid):
837 if value is not None and deployment_config.has_attribute(name):
838 # if any deployment config attribute has a netref, we can't
839 # create this controller yet
840 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
841 # remember to re-issue this one
842 self._netreffed_testbeds.add(guid)
845 # copy deployment config attribute
846 deployment_config.set_attribute_value(name, value)
849 self._deployment_config[guid] = deployment_config
851 if deployment_config is not None:
852 # force recovery mode
853 deployment_config.set_attribute_value("recover",recover)
855 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
857 for (name, value) in data.get_attribute_data(guid):
858 testbed.defer_configure(name, value)
859 self._testbeds[guid] = testbed
860 if guid in self._netreffed_testbeds:
861 self._netreffed_testbeds.remove(guid)
863 def _program_testbed_controllers(self, element_guids, data):
864 def resolve_create_netref(data, guid, name, value):
865 # Try to resolve create-time netrefs, if possible
866 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
868 nuvalue = self.resolve_netref_value(value)
870 # Any trouble means we're not in shape to resolve the netref yet
872 if nuvalue is not None:
873 # Only if we succeed we remove the netref deferral entry
875 data.set_attribute_data(guid, name, value)
876 if (testbed_guid, guid) in self._netrefs:
877 self._netrefs[(testbed_guid, guid)].discard(name)
880 for guid in element_guids:
881 (testbed_guid, factory_id) = data.get_box_data(guid)
882 testbed = self._testbeds.get(testbed_guid)
883 if testbed is not None:
885 testbed.defer_create(guid, factory_id)
887 for (name, value) in data.get_attribute_data(guid):
888 value = resolve_create_netref(data, guid, name, value)
889 testbed.defer_create_set(guid, name, value)
891 for guid in element_guids:
892 (testbed_guid, factory_id) = data.get_box_data(guid)
893 testbed = self._testbeds.get(testbed_guid)
894 if testbed is not None:
896 for trace_id in data.get_trace_data(guid):
897 testbed.defer_add_trace(guid, trace_id)
899 for (address, netprefix, broadcast) in data.get_address_data(guid):
901 testbed.defer_add_address(guid, address, netprefix,
904 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
905 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
906 # store connections data
907 for (connector_type_name, other_guid, other_connector_type_name) \
908 in data.get_connection_data(guid):
909 (other_testbed_guid, other_factory_id) = data.get_box_data(
911 if testbed_guid == other_testbed_guid:
912 # each testbed should take care of enforcing internal
913 # connection simmetry, so each connection is only
914 # added in one direction
915 testbed.defer_connect(guid, connector_type_name,
916 other_guid, other_connector_type_name)
918 def _program_testbed_cross_connections(self, data):
919 data_guids = data.guids
920 for guid in data_guids:
921 if not data.is_testbed_data(guid):
922 (testbed_guid, factory_id) = data.get_box_data(guid)
923 testbed = self._testbeds.get(testbed_guid)
924 if testbed is not None:
925 for (connector_type_name, cross_guid, cross_connector_type_name) \
926 in data.get_connection_data(guid):
927 (testbed_guid, factory_id) = data.get_box_data(guid)
928 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
930 if testbed_guid != cross_testbed_guid:
931 cross_testbed = self._testbeds[cross_testbed_guid]
932 cross_testbed_id = cross_testbed.testbed_id
933 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
934 cross_testbed_guid, cross_testbed_id, cross_factory_id,
935 cross_connector_type_name)
936 # save cross data for later
937 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
940 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
941 if testbed_guid not in self._cross_data:
942 self._cross_data[testbed_guid] = dict()
943 if cross_testbed_guid not in self._cross_data[testbed_guid]:
944 self._cross_data[testbed_guid][cross_testbed_guid] = set()
945 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
947 def _get_cross_data(self, testbed_guid):
949 if not testbed_guid in self._cross_data:
952 # fetch attribute lists in one batch
953 attribute_lists = dict()
954 for cross_testbed_guid, guid_list in \
955 self._cross_data[testbed_guid].iteritems():
956 cross_testbed = self._testbeds[cross_testbed_guid]
957 for cross_guid in guid_list:
958 attribute_lists[(cross_testbed_guid, cross_guid)] = \
959 cross_testbed.get_attribute_list_deferred(cross_guid)
961 # fetch attribute values in another batch
962 for cross_testbed_guid, guid_list in \
963 self._cross_data[testbed_guid].iteritems():
964 cross_data[cross_testbed_guid] = dict()
965 cross_testbed = self._testbeds[cross_testbed_guid]
966 for cross_guid in guid_list:
967 elem_cross_data = dict(
969 _testbed_guid = cross_testbed_guid,
970 _testbed_id = cross_testbed.testbed_id,
971 _testbed_version = cross_testbed.testbed_version)
972 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
973 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
974 for attr_name in attribute_list:
975 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
976 elem_cross_data[attr_name] = attr_value
978 # undefer all values - we'll have to serialize them probably later
979 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
980 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
981 for attr_name, attr_value in elem_cross_data.iteritems():
982 elem_cross_data[attr_name] = _undefer(attr_value)