2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
20 def _undefer(deferred):
21 if hasattr(deferred, '_get'):
22 return deferred._get()
27 class TestbedController(object):
28 def __init__(self, testbed_id, testbed_version):
29 self._testbed_id = testbed_id
30 self._testbed_version = testbed_version
34 return self._testbed_id
37 def testbed_version(self):
38 return self._testbed_version
42 raise NotImplementedError
44 def defer_configure(self, name, value):
45 """Instructs setting a configuartion attribute for the testbed instance"""
46 raise NotImplementedError
48 def defer_create(self, guid, factory_id):
49 """Instructs creation of element """
50 raise NotImplementedError
52 def defer_create_set(self, guid, name, value):
53 """Instructs setting an initial attribute on an element"""
54 raise NotImplementedError
56 def defer_factory_set(self, guid, name, value):
57 """Instructs setting an attribute on a factory"""
58 raise NotImplementedError
60 def defer_connect(self, guid1, connector_type_name1, guid2,
61 connector_type_name2):
62 """Instructs creation of a connection between the given connectors"""
63 raise NotImplementedError
65 def defer_cross_connect(self,
66 guid, connector_type_name,
67 cross_guid, cross_testbed_guid,
68 cross_testbed_id, cross_factory_id,
69 cross_connector_type_name):
71 Instructs creation of a connection between the given connectors
72 of different testbed instances
74 raise NotImplementedError
76 def defer_add_trace(self, guid, trace_id):
77 """Instructs the addition of a trace"""
78 raise NotImplementedError
80 def defer_add_address(self, guid, address, netprefix, broadcast):
81 """Instructs the addition of an address"""
82 raise NotImplementedError
84 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
85 """Instructs the addition of a route"""
86 raise NotImplementedError
89 """After do_setup the testbed initial configuration is done"""
90 raise NotImplementedError
94 After do_create all instructed elements are created and
97 raise NotImplementedError
99 def do_connect_init(self):
101 After do_connect_init all internal connections between testbed elements
104 raise NotImplementedError
106 def do_connect_compl(self):
108 After do_connect all internal connections between testbed elements
111 raise NotImplementedError
113 def do_preconfigure(self):
115 Done just before resolving netrefs, after connection, before cross connections,
116 useful for early stages of configuration, for setting up stuff that might be
117 required for netref resolution.
119 raise NotImplementedError
121 def do_configure(self):
122 """After do_configure elements are configured"""
123 raise NotImplementedError
125 def do_prestart(self):
126 """Before do_start elements are prestart-configured"""
127 raise NotImplementedError
129 def do_cross_connect_init(self, cross_data):
131 After do_cross_connect_init initiation of all external connections
132 between different testbed elements is performed
134 raise NotImplementedError
136 def do_cross_connect_compl(self, cross_data):
138 After do_cross_connect_compl completion of all external connections
139 between different testbed elements is performed
141 raise NotImplementedError
144 raise NotImplementedError
147 raise NotImplementedError
151 On testbed recovery (if recovery is a supported policy), the controller
152 instance will be re-created and the following sequence invoked:
156 <cross-connection methods>
158 Start will not be called, and after cross connection invocations,
159 the testbed is supposed to be fully functional again.
161 raise NotImplementedError
163 def set(self, guid, name, value, time = TIME_NOW):
164 raise NotImplementedError
166 def get(self, guid, name, time = TIME_NOW):
167 raise NotImplementedError
169 def get_route(self, guid, index, attribute):
173 guid: guid of box to query
174 index: number of routing entry to fetch
175 attribute: one of Destination, NextHop, NetPrefix
177 raise NotImplementedError
179 def get_address(self, guid, index, attribute='Address'):
183 guid: guid of box to query
184 index: number of inteface to select
185 attribute: one of Address, NetPrefix, Broadcast
187 raise NotImplementedError
189 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
190 raise NotImplementedError
192 def get_factory_id(self, guid):
193 raise NotImplementedError
195 def action(self, time, guid, action):
196 raise NotImplementedError
198 def status(self, guid):
199 raise NotImplementedError
201 def trace(self, guid, trace_id, attribute='value'):
202 raise NotImplementedError
204 def traces_info(self):
205 """ dictionary of dictionaries:
211 filesize = size in bytes,
215 raise NotImplementedError
218 raise NotImplementedError
220 class ExperimentController(object):
221 def __init__(self, experiment_xml, root_dir):
222 self._experiment_design_xml = experiment_xml
223 self._experiment_execute_xml = None
224 self._testbeds = dict()
225 self._deployment_config = dict()
226 self._netrefs = collections.defaultdict(set)
227 self._testbed_netrefs = collections.defaultdict(set)
228 self._cross_data = dict()
229 self._root_dir = root_dir
230 self._netreffed_testbeds = set()
231 self._guids_in_testbed_cache = dict()
233 if experiment_xml is None and root_dir is not None:
235 self.load_experiment_xml()
236 self.load_execute_xml()
238 self.persist_experiment_xml()
241 def experiment_design_xml(self):
242 return self._experiment_design_xml
245 def experiment_execute_xml(self):
246 return self._experiment_execute_xml
251 for testbed_guid in self._testbeds.keys():
252 _guids = self._guids_in_testbed(testbed_guid)
257 def persist_experiment_xml(self):
258 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
259 f = open(xml_path, "w")
260 f.write(self._experiment_design_xml)
263 def persist_execute_xml(self):
264 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
265 f = open(xml_path, "w")
266 f.write(self._experiment_execute_xml)
269 def load_experiment_xml(self):
270 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
271 f = open(xml_path, "r")
272 self._experiment_design_xml = f.read()
275 def load_execute_xml(self):
276 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
277 f = open(xml_path, "r")
278 self._experiment_execute_xml = f.read()
281 def trace(self, guid, trace_id, attribute='value'):
282 testbed = self._testbed_for_guid(guid)
284 return testbed.trace(guid, trace_id, attribute)
285 raise RuntimeError("No element exists with guid %d" % guid)
287 def traces_info(self):
289 for guid, testbed in self._testbeds.iteritems():
290 tinfo = testbed.traces_info()
292 traces_info[guid] = testbed.traces_info()
296 def _parallel(callables):
299 @functools.wraps(callable)
300 def wrapped(*p, **kw):
305 traceback.print_exc(file=sys.stderr)
306 excs.append(sys.exc_info())
308 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
309 for thread in threads:
311 for thread in threads:
314 eTyp, eVal, eLoc = exc
315 raise eTyp, eVal, eLoc
320 def _start(self, recover = False):
321 parser = XmlExperimentParser()
324 xml = self._experiment_execute_xml
326 xml = self._experiment_design_xml
327 data = parser.from_xml_to_data(xml)
329 # instantiate testbed controllers
330 to_recover, to_restart = self._init_testbed_controllers(data, recover)
331 all_restart = set(to_restart)
334 # persist testbed connection data, for potential recovery
335 self._persist_testbed_proxies()
337 # recover recoverable controllers
338 for guid in to_recover:
339 self._testbeds[guid].do_setup()
340 self._testbeds[guid].recover()
342 def steps_to_configure(self, allowed_guids):
343 # perform setup in parallel for all test beds,
344 # wait for all threads to finish
345 self._parallel([testbed.do_setup
346 for guid,testbed in self._testbeds.iteritems()
347 if guid in allowed_guids])
349 # perform create-connect in parallel, wait
350 # (internal connections only)
351 self._parallel([testbed.do_create
352 for guid,testbed in self._testbeds.iteritems()
353 if guid in allowed_guids])
355 self._parallel([testbed.do_connect_init
356 for guid,testbed in self._testbeds.iteritems()
357 if guid in allowed_guids])
359 self._parallel([testbed.do_connect_compl
360 for guid,testbed in self._testbeds.iteritems()
361 if guid in allowed_guids])
363 self._parallel([testbed.do_preconfigure
364 for guid,testbed in self._testbeds.iteritems()
365 if guid in allowed_guids])
368 steps_to_configure(self, to_restart)
370 if self._netreffed_testbeds:
371 # initally resolve netrefs
372 self.do_netrefs(data, fail_if_undefined=False)
374 # rinse and repeat, for netreffed testbeds
375 netreffed_testbeds = set(self._netreffed_testbeds)
377 to_recover, to_restart = self._init_testbed_controllers(data, recover)
378 all_restart.update(to_restart)
381 # persist testbed connection data, for potential recovery
382 self._persist_testbed_proxies()
384 # recover recoverable controllers
385 for guid in to_recover:
386 self._testbeds[guid].do_setup()
387 self._testbeds[guid].recover()
389 # configure dependant testbeds
390 steps_to_configure(self, to_restart)
392 all_restart = [ self._testbeds[guid] for guid in all_restart ]
394 # final netref step, fail if anything's left unresolved
395 self.do_netrefs(data, fail_if_undefined=True)
397 # Only now, that netref dependencies have been solve, it is safe to
398 # program cross_connections
399 self._program_testbed_cross_connections(data)
401 # perform do_configure in parallel for al testbeds
402 # (it's internal configuration for each)
403 self._parallel([testbed.do_configure
404 for testbed in all_restart])
408 #print >>sys.stderr, "DO IT"
412 # cross-connect (cannot be done in parallel)
413 for guid, testbed in self._testbeds.iteritems():
414 cross_data = self._get_cross_data(guid)
415 testbed.do_cross_connect_init(cross_data)
416 for guid, testbed in self._testbeds.iteritems():
417 cross_data = self._get_cross_data(guid)
418 testbed.do_cross_connect_compl(cross_data)
422 # Last chance to configure (parallel on all testbeds)
423 self._parallel([testbed.do_prestart
424 for testbed in all_restart])
429 # update execution xml with execution-specific values
430 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
431 self._update_execute_xml()
432 self.persist_execute_xml()
434 # start experiment (parallel start on all testbeds)
435 self._parallel([testbed.start
436 for testbed in all_restart])
440 def _clear_caches(self):
441 # Cleaning cache for safety.
442 self._guids_in_testbed_cache = dict()
444 def _persist_testbed_proxies(self):
445 TRANSIENT = (DC.RECOVER,)
447 # persist access configuration for all testbeds, so that
448 # recovery mode can reconnect to them if it becomes necessary
449 conf = ConfigParser.RawConfigParser()
450 for testbed_guid, testbed_config in self._deployment_config.iteritems():
451 testbed_guid = str(testbed_guid)
452 conf.add_section(testbed_guid)
453 for attr in testbed_config.get_attribute_list():
454 if attr not in TRANSIENT:
455 conf.set(testbed_guid, attr,
456 testbed_config.get_attribute_value(attr))
458 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
462 def _load_testbed_proxies(self):
464 Attribute.STRING : 'get',
465 Attribute.BOOL : 'getboolean',
466 Attribute.ENUM : 'get',
467 Attribute.DOUBLE : 'getfloat',
468 Attribute.INTEGER : 'getint',
471 TRANSIENT = (DC.RECOVER,)
473 # deferred import because proxy needs
474 # our class definitions to define proxies
475 import nepi.util.proxy as proxy
477 conf = ConfigParser.RawConfigParser()
478 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
479 for testbed_guid in conf.sections():
480 testbed_config = proxy.AccessConfiguration()
481 testbed_guid = str(testbed_guid)
482 for attr in testbed_config.get_attribute_list():
483 if attr not in TRANSIENT:
484 getter = getattr(conf, TYPEMAP.get(
485 testbed_config.get_attribute_type(attr),
487 testbed_config.set_attribute_value(
488 attr, getter(testbed_guid, attr))
490 def _unpersist_testbed_proxies(self):
492 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
494 # Just print exceptions, this is just cleanup
496 ######## BUG ##########
497 #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
498 #traceback.print_exc(file=sys.stderr)
500 def _update_execute_xml(self):
502 # For all elements in testbed,
503 # - gather immutable execute-readable attribuets lists
505 # Generate new design description from design xml
506 # (Wait for attributes lists - implicit syncpoint)
508 # For all elements in testbed,
509 # - gather all immutable execute-readable attribute
510 # values, asynchronously
511 # (Wait for attribute values - implicit syncpoint)
513 # For all elements in testbed,
514 # - inject non-None values into new design
515 # Generate execute xml from new design
517 attribute_lists = dict(
518 (testbed_guid, collections.defaultdict(dict))
519 for testbed_guid in self._testbeds
522 for testbed_guid, testbed in self._testbeds.iteritems():
523 guids = self._guids_in_testbed(testbed_guid)
525 attribute_lists[testbed_guid][guid] = \
526 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
528 parser = XmlExperimentParser()
529 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
531 attribute_values = dict(
532 (testbed_guid, collections.defaultdict(dict))
533 for testbed_guid in self._testbeds
536 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
537 testbed = self._testbeds[testbed_guid]
538 for guid, attribute_list in testbed_attribute_lists.iteritems():
539 attribute_list = _undefer(attribute_list)
540 attribute_values[testbed_guid][guid] = dict(
541 (attribute, testbed.get_deferred(guid, attribute))
542 for attribute in attribute_list
545 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
546 for guid, attribute_values in testbed_attribute_values.iteritems():
547 for attribute, value in attribute_values.iteritems():
548 value = _undefer(value)
549 if value is not None:
550 execute_data.add_attribute_data(guid, attribute, value)
552 self._experiment_execute_xml = parser.to_xml(data=execute_data)
555 for testbed in self._testbeds.values():
557 self._unpersist_testbed_proxies()
560 # reload perviously persisted testbed access configurations
561 self._load_testbed_proxies()
563 self._start(recover = True)
565 def is_finished(self, guid):
566 testbed = self._testbed_for_guid(guid)
568 return testbed.status(guid) == AS.STATUS_FINISHED
569 raise RuntimeError("No element exists with guid %d" % guid)
571 def status(self, guid):
572 testbed = self._testbed_for_guid(guid)
574 return testbed.status(guid)
575 raise RuntimeError("No element exists with guid %d" % guid)
577 def set(self, guid, name, value, time = TIME_NOW):
578 testbed = self._testbed_for_guid(guid)
580 testbed.set(guid, name, value, time)
582 raise RuntimeError("No element exists with guid %d" % guid)
584 def get(self, guid, name, time = TIME_NOW):
585 testbed = self._testbed_for_guid(guid)
587 return testbed.get(guid, name, time)
588 raise RuntimeError("No element exists with guid %d" % guid)
590 def get_deferred(self, guid, name, time = TIME_NOW):
591 testbed = self._testbed_for_guid(guid)
593 return testbed.get_deferred(guid, name, time)
594 raise RuntimeError("No element exists with guid %d" % guid)
596 def get_factory_id(self, guid):
597 testbed = self._testbed_for_guid(guid)
599 return testbed.get_factory_id(guid)
600 raise RuntimeError("No element exists with guid %d" % guid)
602 def get_testbed_id(self, guid):
603 testbed = self._testbed_for_guid(guid)
605 return testbed.testbed_id
606 raise RuntimeError("No element exists with guid %d" % guid)
608 def get_testbed_version(self, guid):
609 testbed = self._testbed_for_guid(guid)
611 return testbed.testbed_version
612 raise RuntimeError("No element exists with guid %d" % guid)
616 for testbed in self._testbeds.values():
620 exceptions.append(sys.exc_info())
621 for exc_info in exceptions:
622 raise exc_info[0], exc_info[1], exc_info[2]
624 def _testbed_for_guid(self, guid):
625 for testbed_guid in self._testbeds.keys():
626 if guid in self._guids_in_testbed(testbed_guid):
627 return self._testbeds[testbed_guid]
630 def _guids_in_testbed(self, testbed_guid):
631 if testbed_guid not in self._testbeds:
633 if testbed_guid not in self._guids_in_testbed_cache:
634 self._guids_in_testbed_cache[testbed_guid] = \
635 set(self._testbeds[testbed_guid].guids)
636 return self._guids_in_testbed_cache[testbed_guid]
639 def _netref_component_split(component):
640 match = COMPONENT_PATTERN.match(component)
642 return match.group("kind"), match.group("index")
644 return component, None
646 _NETREF_COMPONENT_GETTERS = {
648 lambda testbed, guid, index, name:
649 testbed.get_address(guid, int(index), name),
651 lambda testbed, guid, index, name:
652 testbed.get_route(guid, int(index), name),
654 lambda testbed, guid, index, name:
655 testbed.trace(guid, index, name),
657 lambda testbed, guid, index, name:
658 testbed.get(guid, name),
661 def resolve_netref_value(self, value, failval = None):
662 match = ATTRIBUTE_PATTERN_BASE.search(value)
664 label = match.group("label")
665 if label.startswith('GUID-'):
666 ref_guid = int(label[5:])
668 expr = match.group("expr")
669 component = (match.group("component") or "")[1:] # skip the dot
670 attribute = match.group("attribute")
672 # split compound components into component kind and index
673 # eg: 'addr[0]' -> ('addr', '0')
674 component, component_index = self._netref_component_split(component)
676 # find object and resolve expression
677 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
678 if component not in self._NETREF_COMPONENT_GETTERS:
679 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
680 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
683 ref_value = self._NETREF_COMPONENT_GETTERS[component](
684 ref_testbed, ref_guid, component_index, attribute)
686 return value.replace(match.group(), ref_value)
687 # couldn't find value
690 def do_netrefs(self, data, fail_if_undefined = False):
692 for (testbed_guid, guid), attrs in self._netrefs.items():
693 testbed = self._testbeds.get(testbed_guid)
694 if testbed is not None:
695 for name in set(attrs):
696 value = testbed.get(guid, name)
697 if isinstance(value, basestring):
698 ref_value = self.resolve_netref_value(value)
699 if ref_value is not None:
700 testbed.set(guid, name, ref_value)
702 elif fail_if_undefined:
703 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
705 del self._netrefs[(testbed_guid, guid)]
708 for testbed_guid, attrs in self._testbed_netrefs.items():
709 tb_data = dict(data.get_attribute_data(testbed_guid))
711 for name in set(attrs):
712 value = tb_data.get(name)
713 if isinstance(value, basestring):
714 ref_value = self.resolve_netref_value(value)
715 if ref_value is not None:
716 data.set_attribute_data(testbed_guid, name, ref_value)
718 elif fail_if_undefined:
719 raise ValueError, "Unresolvable netref in: %r" % (value,)
721 del self._testbed_netrefs[testbed_guid]
724 def _init_testbed_controllers(self, data, recover = False):
725 blacklist_testbeds = set(self._testbeds)
726 element_guids = list()
728 data_guids = data.guids
732 # gather label associations
733 for guid in data_guids:
734 if not data.is_testbed_data(guid):
735 (testbed_guid, factory_id) = data.get_box_data(guid)
736 label = data.get_attribute_data(guid, "label")
737 if label is not None:
738 if label in label_guids:
739 raise RuntimeError, "Label %r is not unique" % (label,)
740 label_guids[label] = guid
742 # create testbed controllers
743 for guid in data_guids:
744 if data.is_testbed_data(guid):
745 if guid not in self._testbeds:
747 self._create_testbed_controller(
748 guid, data, element_guids, recover)
751 blacklist_testbeds.add(guid)
756 policy = data.get_attribute_data(guid, DC.RECOVERY_POLICY)
757 if policy == DC.POLICY_FAIL:
759 elif policy == DC.POLICY_RECOVER:
760 self._create_testbed_controller(
761 guid, data, element_guids, False)
763 elif policy == DC.POLICY_RESTART:
764 self._create_testbed_controller(
765 guid, data, element_guids, False)
772 # queue programmable elements
773 # - that have not been programmed already (blacklist_testbeds)
774 # - including recovered or restarted testbeds
775 # - but those that have no unresolved netrefs
776 for guid in data_guids:
777 if not data.is_testbed_data(guid):
778 (testbed_guid, factory_id) = data.get_box_data(guid)
779 if testbed_guid not in blacklist_testbeds:
780 element_guids.append(guid)
782 # replace references to elements labels for its guid
783 self._resolve_labels(data, data_guids, label_guids)
785 # program testbed controllers
787 self._program_testbed_controllers(element_guids, data)
789 return to_recover, to_restart
791 def _resolve_labels(self, data, data_guids, label_guids):
792 netrefs = self._netrefs
793 testbed_netrefs = self._testbed_netrefs
794 for guid in data_guids:
795 for name, value in data.get_attribute_data(guid):
796 if isinstance(value, basestring):
797 match = ATTRIBUTE_PATTERN_BASE.search(value)
799 label = match.group("label")
800 if not label.startswith('GUID-'):
801 ref_guid = label_guids.get(label)
802 if ref_guid is not None:
803 value = ATTRIBUTE_PATTERN_BASE.sub(
804 ATTRIBUTE_PATTERN_GUID_SUB % dict(
805 guid = 'GUID-%d' % (ref_guid,),
806 expr = match.group("expr"),
809 data.set_attribute_data(guid, name, value)
811 # memorize which guid-attribute pairs require
812 # postprocessing, to avoid excessive controller-testbed
813 # communication at configuration time
814 # (which could require high-latency network I/O)
815 if not data.is_testbed_data(guid):
816 (testbed_guid, factory_id) = data.get_box_data(guid)
817 netrefs[(testbed_guid, guid)].add(name)
819 testbed_netrefs[guid].add(name)
821 def _create_testbed_controller(self, guid, data, element_guids, recover):
822 (testbed_id, testbed_version) = data.get_testbed_data(guid)
823 deployment_config = self._deployment_config.get(guid)
825 # deferred import because proxy needs
826 # our class definitions to define proxies
827 import nepi.util.proxy as proxy
829 if deployment_config is None:
831 deployment_config = proxy.AccessConfiguration()
833 for (name, value) in data.get_attribute_data(guid):
834 if value is not None and deployment_config.has_attribute(name):
835 # if any deployment config attribute has a netref, we can't
836 # create this controller yet
837 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
838 # remember to re-issue this one
839 self._netreffed_testbeds.add(guid)
842 # copy deployment config attribute
843 deployment_config.set_attribute_value(name, value)
846 self._deployment_config[guid] = deployment_config
848 if deployment_config is not None:
849 # force recovery mode
850 deployment_config.set_attribute_value("recover",recover)
852 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
854 for (name, value) in data.get_attribute_data(guid):
855 testbed.defer_configure(name, value)
856 self._testbeds[guid] = testbed
857 if guid in self._netreffed_testbeds:
858 self._netreffed_testbeds.remove(guid)
860 def _program_testbed_controllers(self, element_guids, data):
861 def resolve_create_netref(data, guid, name, value):
862 # Try to resolve create-time netrefs, if possible
863 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
865 nuvalue = self.resolve_netref_value(value)
867 # Any trouble means we're not in shape to resolve the netref yet
869 if nuvalue is not None:
870 # Only if we succeed we remove the netref deferral entry
872 data.set_attribute_data(guid, name, value)
873 if (testbed_guid, guid) in self._netrefs:
874 self._netrefs[(testbed_guid, guid)].discard(name)
877 for guid in element_guids:
878 (testbed_guid, factory_id) = data.get_box_data(guid)
879 testbed = self._testbeds.get(testbed_guid)
880 if testbed is not None:
882 testbed.defer_create(guid, factory_id)
884 for (name, value) in data.get_attribute_data(guid):
885 value = resolve_create_netref(data, guid, name, value)
886 testbed.defer_create_set(guid, name, value)
888 for guid in element_guids:
889 (testbed_guid, factory_id) = data.get_box_data(guid)
890 testbed = self._testbeds.get(testbed_guid)
891 if testbed is not None:
893 for trace_id in data.get_trace_data(guid):
894 testbed.defer_add_trace(guid, trace_id)
896 for (address, netprefix, broadcast) in data.get_address_data(guid):
898 testbed.defer_add_address(guid, address, netprefix,
901 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
902 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
903 # store connections data
904 for (connector_type_name, other_guid, other_connector_type_name) \
905 in data.get_connection_data(guid):
906 (other_testbed_guid, other_factory_id) = data.get_box_data(
908 if testbed_guid == other_testbed_guid:
909 # each testbed should take care of enforcing internal
910 # connection simmetry, so each connection is only
911 # added in one direction
912 testbed.defer_connect(guid, connector_type_name,
913 other_guid, other_connector_type_name)
915 def _program_testbed_cross_connections(self, data):
916 data_guids = data.guids
917 for guid in data_guids:
918 if not data.is_testbed_data(guid):
919 (testbed_guid, factory_id) = data.get_box_data(guid)
920 testbed = self._testbeds.get(testbed_guid)
921 if testbed is not None:
922 for (connector_type_name, cross_guid, cross_connector_type_name) \
923 in data.get_connection_data(guid):
924 (testbed_guid, factory_id) = data.get_box_data(guid)
925 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
927 if testbed_guid != cross_testbed_guid:
928 cross_testbed = self._testbeds[cross_testbed_guid]
929 cross_testbed_id = cross_testbed.testbed_id
930 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
931 cross_testbed_guid, cross_testbed_id, cross_factory_id,
932 cross_connector_type_name)
933 # save cross data for later
934 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
937 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
938 if testbed_guid not in self._cross_data:
939 self._cross_data[testbed_guid] = dict()
940 if cross_testbed_guid not in self._cross_data[testbed_guid]:
941 self._cross_data[testbed_guid][cross_testbed_guid] = set()
942 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
944 def _get_cross_data(self, testbed_guid):
946 if not testbed_guid in self._cross_data:
949 # fetch attribute lists in one batch
950 attribute_lists = dict()
951 for cross_testbed_guid, guid_list in \
952 self._cross_data[testbed_guid].iteritems():
953 cross_testbed = self._testbeds[cross_testbed_guid]
954 for cross_guid in guid_list:
955 attribute_lists[(cross_testbed_guid, cross_guid)] = \
956 cross_testbed.get_attribute_list_deferred(cross_guid)
958 # fetch attribute values in another batch
959 for cross_testbed_guid, guid_list in \
960 self._cross_data[testbed_guid].iteritems():
961 cross_data[cross_testbed_guid] = dict()
962 cross_testbed = self._testbeds[cross_testbed_guid]
963 for cross_guid in guid_list:
964 elem_cross_data = dict(
966 _testbed_guid = cross_testbed_guid,
967 _testbed_id = cross_testbed.testbed_id,
968 _testbed_version = cross_testbed.testbed_version)
969 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
970 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
971 for attr_name in attribute_list:
972 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
973 elem_cross_data[attr_name] = attr_value
975 # undefer all values - we'll have to serialize them probably later
976 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
977 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
978 for attr_name, attr_value in elem_cross_data.iteritems():
979 elem_cross_data[attr_name] = _undefer(attr_value)