2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
20 def _undefer(deferred):
21 if hasattr(deferred, '_get'):
22 return deferred._get()
27 class TestbedController(object):
28 def __init__(self, testbed_id, testbed_version):
29 self._testbed_id = testbed_id
30 self._testbed_version = testbed_version
34 return self._testbed_id
37 def testbed_version(self):
38 return self._testbed_version
42 raise NotImplementedError
44 def defer_configure(self, name, value):
45 """Instructs setting a configuartion attribute for the testbed instance"""
46 raise NotImplementedError
48 def defer_create(self, guid, factory_id):
49 """Instructs creation of element """
50 raise NotImplementedError
52 def defer_create_set(self, guid, name, value):
53 """Instructs setting an initial attribute on an element"""
54 raise NotImplementedError
56 def defer_factory_set(self, guid, name, value):
57 """Instructs setting an attribute on a factory"""
58 raise NotImplementedError
60 def defer_connect(self, guid1, connector_type_name1, guid2,
61 connector_type_name2):
62 """Instructs creation of a connection between the given connectors"""
63 raise NotImplementedError
65 def defer_cross_connect(self,
66 guid, connector_type_name,
67 cross_guid, cross_testbed_guid,
68 cross_testbed_id, cross_factory_id,
69 cross_connector_type_name):
71 Instructs creation of a connection between the given connectors
72 of different testbed instances
74 raise NotImplementedError
76 def defer_add_trace(self, guid, trace_id):
77 """Instructs the addition of a trace"""
78 raise NotImplementedError
80 def defer_add_address(self, guid, address, netprefix, broadcast):
81 """Instructs the addition of an address"""
82 raise NotImplementedError
84 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
85 """Instructs the addition of a route"""
86 raise NotImplementedError
89 """After do_setup the testbed initial configuration is done"""
90 raise NotImplementedError
94 After do_create all instructed elements are created and
97 raise NotImplementedError
99 def do_connect_init(self):
101 After do_connect_init all internal connections between testbed elements
104 raise NotImplementedError
106 def do_connect_compl(self):
108 After do_connect all internal connections between testbed elements
111 raise NotImplementedError
113 def do_preconfigure(self):
115 Done just before resolving netrefs, after connection, before cross connections,
116 useful for early stages of configuration, for setting up stuff that might be
117 required for netref resolution.
119 raise NotImplementedError
121 def do_configure(self):
122 """After do_configure elements are configured"""
123 raise NotImplementedError
125 def do_prestart(self):
126 """Before do_start elements are prestart-configured"""
127 raise NotImplementedError
129 def do_cross_connect_init(self, cross_data):
131 After do_cross_connect_init initiation of all external connections
132 between different testbed elements is performed
134 raise NotImplementedError
136 def do_cross_connect_compl(self, cross_data):
138 After do_cross_connect_compl completion of all external connections
139 between different testbed elements is performed
141 raise NotImplementedError
144 raise NotImplementedError
147 raise NotImplementedError
149 def set(self, guid, name, value, time = TIME_NOW):
150 raise NotImplementedError
152 def get(self, guid, name, time = TIME_NOW):
153 raise NotImplementedError
155 def get_route(self, guid, index, attribute):
159 guid: guid of box to query
160 index: number of routing entry to fetch
161 attribute: one of Destination, NextHop, NetPrefix
163 raise NotImplementedError
165 def get_address(self, guid, index, attribute='Address'):
169 guid: guid of box to query
170 index: number of inteface to select
171 attribute: one of Address, NetPrefix, Broadcast
173 raise NotImplementedError
175 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
176 raise NotImplementedError
178 def get_factory_id(self, guid):
179 raise NotImplementedError
181 def action(self, time, guid, action):
182 raise NotImplementedError
184 def status(self, guid):
185 raise NotImplementedError
187 def trace(self, guid, trace_id, attribute='value'):
188 raise NotImplementedError
190 def traces_info(self):
191 """ dictionary of dictionaries:
197 filesize = size in bytes,
201 raise NotImplementedError
204 raise NotImplementedError
206 class ExperimentController(object):
207 def __init__(self, experiment_xml, root_dir):
208 self._experiment_design_xml = experiment_xml
209 self._experiment_execute_xml = None
210 self._testbeds = dict()
211 self._deployment_config = dict()
212 self._netrefs = collections.defaultdict(set)
213 self._testbed_netrefs = collections.defaultdict(set)
214 self._cross_data = dict()
215 self._root_dir = root_dir
216 self._netreffed_testbeds = set()
217 self._guids_in_testbed_cache = dict()
219 if experiment_xml is None and root_dir is not None:
221 self.load_experiment_xml()
222 self.load_execute_xml()
224 self.persist_experiment_xml()
227 def experiment_design_xml(self):
228 return self._experiment_design_xml
231 def experiment_execute_xml(self):
232 return self._experiment_execute_xml
237 for testbed_guid in self._testbeds.keys():
238 _guids = self._guids_in_testbed(testbed_guid)
243 def persist_experiment_xml(self):
244 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
245 f = open(xml_path, "w")
246 f.write(self._experiment_design_xml)
249 def persist_execute_xml(self):
250 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
251 f = open(xml_path, "w")
252 f.write(self._experiment_execute_xml)
255 def load_experiment_xml(self):
256 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
257 f = open(xml_path, "r")
258 self._experiment_design_xml = f.read()
261 def load_execute_xml(self):
262 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
263 f = open(xml_path, "r")
264 self._experiment_execute_xml = f.read()
267 def trace(self, guid, trace_id, attribute='value'):
268 testbed = self._testbed_for_guid(guid)
270 return testbed.trace(guid, trace_id, attribute)
271 raise RuntimeError("No element exists with guid %d" % guid)
273 def traces_info(self):
275 for guid, testbed in self._testbeds.iteritems():
276 tinfo = testbed.traces_info()
278 traces_info[guid] = testbed.traces_info()
282 def _parallel(callables):
285 @functools.wraps(callable)
286 def wrapped(*p, **kw):
291 traceback.print_exc(file=sys.stderr)
292 excs.append(sys.exc_info())
294 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
295 for thread in threads:
297 for thread in threads:
300 eTyp, eVal, eLoc = exc
301 raise eTyp, eVal, eLoc
304 parser = XmlExperimentParser()
305 data = parser.from_xml_to_data(self._experiment_design_xml)
307 # instantiate testbed controllers
308 self._init_testbed_controllers(data)
310 # persist testbed connection data, for potential recovery
311 self._persist_testbed_proxies()
313 def steps_to_configure(self, allowed_guids):
314 # perform setup in parallel for all test beds,
315 # wait for all threads to finish
316 self._parallel([testbed.do_setup
317 for guid,testbed in self._testbeds.iteritems()
318 if guid in allowed_guids])
320 # perform create-connect in parallel, wait
321 # (internal connections only)
322 self._parallel([testbed.do_create
323 for guid,testbed in self._testbeds.iteritems()
324 if guid in allowed_guids])
326 self._parallel([testbed.do_connect_init
327 for guid,testbed in self._testbeds.iteritems()
328 if guid in allowed_guids])
330 self._parallel([testbed.do_connect_compl
331 for guid,testbed in self._testbeds.iteritems()
332 if guid in allowed_guids])
334 self._parallel([testbed.do_preconfigure
335 for guid,testbed in self._testbeds.iteritems()
336 if guid in allowed_guids])
339 steps_to_configure(self, self._testbeds)
341 if self._netreffed_testbeds:
342 # initally resolve netrefs
343 self.do_netrefs(data, fail_if_undefined=False)
345 # rinse and repeat, for netreffed testbeds
346 netreffed_testbeds = set(self._netreffed_testbeds)
348 self._init_testbed_controllers(data)
350 # persist testbed connection data, for potential recovery
351 self._persist_testbed_proxies()
353 # configure dependant testbeds
354 steps_to_configure(self, netreffed_testbeds)
356 # final netref step, fail if anything's left unresolved
357 self.do_netrefs(data, fail_if_undefined=True)
359 # Only now, that netref dependencies have been solve, it is safe to
360 # program cross_connections
361 self._program_testbed_cross_connections(data)
363 # perform do_configure in parallel for al testbeds
364 # (it's internal configuration for each)
365 self._parallel([testbed.do_configure
366 for testbed in self._testbeds.itervalues()])
370 #print >>sys.stderr, "DO IT"
374 # cross-connect (cannot be done in parallel)
375 for guid, testbed in self._testbeds.iteritems():
376 cross_data = self._get_cross_data(guid)
377 testbed.do_cross_connect_init(cross_data)
378 for guid, testbed in self._testbeds.iteritems():
379 cross_data = self._get_cross_data(guid)
380 testbed.do_cross_connect_compl(cross_data)
384 # Last chance to configure (parallel on all testbeds)
385 self._parallel([testbed.do_prestart
386 for testbed in self._testbeds.itervalues()])
390 # update execution xml with execution-specific values
391 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
392 self._update_execute_xml()
393 self.persist_execute_xml()
395 # start experiment (parallel start on all testbeds)
396 self._parallel([testbed.start
397 for testbed in self._testbeds.itervalues()])
401 def _clear_caches(self):
402 # Cleaning cache for safety.
403 self._guids_in_testbed_cache = dict()
405 def _persist_testbed_proxies(self):
406 TRANSIENT = ('Recover',)
408 # persist access configuration for all testbeds, so that
409 # recovery mode can reconnect to them if it becomes necessary
410 conf = ConfigParser.RawConfigParser()
411 for testbed_guid, testbed_config in self._deployment_config.iteritems():
412 testbed_guid = str(testbed_guid)
413 conf.add_section(testbed_guid)
414 for attr in testbed_config.get_attribute_list():
415 if attr not in TRANSIENT:
416 conf.set(testbed_guid, attr,
417 testbed_config.get_attribute_value(attr))
419 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
423 def _load_testbed_proxies(self):
425 Attribute.STRING : 'get',
426 Attribute.BOOL : 'getboolean',
427 Attribute.ENUM : 'get',
428 Attribute.DOUBLE : 'getfloat',
429 Attribute.INTEGER : 'getint',
432 TRANSIENT = ('Recover',)
434 # deferred import because proxy needs
435 # our class definitions to define proxies
436 import nepi.util.proxy as proxy
438 conf = ConfigParser.RawConfigParser()
439 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
440 for testbed_guid in conf.sections():
441 testbed_config = proxy.AccessConfiguration()
442 testbed_guid = str(testbed_guid)
443 for attr in testbed_config.get_attribute_list():
444 if attr not in TRANSIENT:
445 getter = getattr(conf, TYPEMAP.get(
446 testbed_config.get_attribute_type(attr),
448 testbed_config.set_attribute_value(
449 attr, getter(testbed_guid, attr))
451 def _unpersist_testbed_proxies(self):
453 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
455 # Just print exceptions, this is just cleanup
457 ######## BUG ##########
458 #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
459 #traceback.print_exc(file=sys.stderr)
461 def _update_execute_xml(self):
463 # For all elements in testbed,
464 # - gather immutable execute-readable attribuets lists
466 # Generate new design description from design xml
467 # (Wait for attributes lists - implicit syncpoint)
469 # For all elements in testbed,
470 # - gather all immutable execute-readable attribute
471 # values, asynchronously
472 # (Wait for attribute values - implicit syncpoint)
474 # For all elements in testbed,
475 # - inject non-None values into new design
476 # Generate execute xml from new design
478 attribute_lists = dict(
479 (testbed_guid, collections.defaultdict(dict))
480 for testbed_guid in self._testbeds
483 for testbed_guid, testbed in self._testbeds.iteritems():
484 guids = self._guids_in_testbed(testbed_guid)
486 attribute_lists[testbed_guid][guid] = \
487 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
489 parser = XmlExperimentParser()
490 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
492 attribute_values = dict(
493 (testbed_guid, collections.defaultdict(dict))
494 for testbed_guid in self._testbeds
497 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
498 testbed = self._testbeds[testbed_guid]
499 for guid, attribute_list in testbed_attribute_lists.iteritems():
500 attribute_list = _undefer(attribute_list)
501 attribute_values[testbed_guid][guid] = dict(
502 (attribute, testbed.get_deferred(guid, attribute))
503 for attribute in attribute_list
506 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
507 for guid, attribute_values in testbed_attribute_values.iteritems():
508 for attribute, value in attribute_values.iteritems():
509 value = _undefer(value)
510 if value is not None:
511 execute_data.add_attribute_data(guid, attribute, value)
513 self._experiment_execute_xml = parser.to_xml(data=execute_data)
516 for testbed in self._testbeds.values():
518 self._unpersist_testbed_proxies()
521 # reload perviously persisted testbed access configurations
522 self._load_testbed_proxies()
524 # Parse experiment xml
525 parser = XmlExperimentParser()
526 data = parser.from_xml_to_data(self._experiment_design_xml)
528 # recreate testbed proxies by reconnecting only
529 self._init_testbed_controllers(data, recover = True)
531 # another time, for netrefs
532 self._init_testbed_controllers(data, recover = True)
534 print >>sys.stderr, "RECOVERED"
536 def is_finished(self, guid):
537 testbed = self._testbed_for_guid(guid)
539 return testbed.status(guid) == AS.STATUS_FINISHED
540 raise RuntimeError("No element exists with guid %d" % guid)
542 def set(self, guid, name, value, time = TIME_NOW):
543 testbed = self._testbed_for_guid(guid)
545 testbed.set(guid, name, value, time)
547 raise RuntimeError("No element exists with guid %d" % guid)
549 def get(self, guid, name, time = TIME_NOW):
550 testbed = self._testbed_for_guid(guid)
552 return testbed.get(guid, name, time)
553 raise RuntimeError("No element exists with guid %d" % guid)
555 def get_deferred(self, guid, name, time = TIME_NOW):
556 testbed = self._testbed_for_guid(guid)
558 return testbed.get_deferred(guid, name, time)
559 raise RuntimeError("No element exists with guid %d" % guid)
561 def get_factory_id(self, guid):
562 testbed = self._testbed_for_guid(guid)
564 return testbed.get_factory_id(guid)
565 raise RuntimeError("No element exists with guid %d" % guid)
567 def get_testbed_id(self, guid):
568 testbed = self._testbed_for_guid(guid)
570 return testbed.testbed_id
571 raise RuntimeError("No element exists with guid %d" % guid)
573 def get_testbed_version(self, guid):
574 testbed = self._testbed_for_guid(guid)
576 return testbed.testbed_version
577 raise RuntimeError("No element exists with guid %d" % guid)
581 for testbed in self._testbeds.values():
585 exceptions.append(sys.exc_info())
586 for exc_info in exceptions:
587 raise exc_info[0], exc_info[1], exc_info[2]
589 def _testbed_for_guid(self, guid):
590 for testbed_guid in self._testbeds.keys():
591 if guid in self._guids_in_testbed(testbed_guid):
592 return self._testbeds[testbed_guid]
595 def _guids_in_testbed(self, testbed_guid):
596 if testbed_guid not in self._testbeds:
598 if testbed_guid not in self._guids_in_testbed_cache:
599 self._guids_in_testbed_cache[testbed_guid] = \
600 set(self._testbeds[testbed_guid].guids)
601 return self._guids_in_testbed_cache[testbed_guid]
604 def _netref_component_split(component):
605 match = COMPONENT_PATTERN.match(component)
607 return match.group("kind"), match.group("index")
609 return component, None
611 _NETREF_COMPONENT_GETTERS = {
613 lambda testbed, guid, index, name:
614 testbed.get_address(guid, int(index), name),
616 lambda testbed, guid, index, name:
617 testbed.get_route(guid, int(index), name),
619 lambda testbed, guid, index, name:
620 testbed.trace(guid, index, name),
622 lambda testbed, guid, index, name:
623 testbed.get(guid, name),
626 def resolve_netref_value(self, value, failval = None):
627 match = ATTRIBUTE_PATTERN_BASE.search(value)
629 label = match.group("label")
630 if label.startswith('GUID-'):
631 ref_guid = int(label[5:])
633 expr = match.group("expr")
634 component = (match.group("component") or "")[1:] # skip the dot
635 attribute = match.group("attribute")
637 # split compound components into component kind and index
638 # eg: 'addr[0]' -> ('addr', '0')
639 component, component_index = self._netref_component_split(component)
641 # find object and resolve expression
642 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
643 if component not in self._NETREF_COMPONENT_GETTERS:
644 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
645 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
648 ref_value = self._NETREF_COMPONENT_GETTERS[component](
649 ref_testbed, ref_guid, component_index, attribute)
651 return value.replace(match.group(), ref_value)
652 # couldn't find value
655 def do_netrefs(self, data, fail_if_undefined = False):
657 for (testbed_guid, guid), attrs in self._netrefs.items():
658 testbed = self._testbeds.get(testbed_guid)
659 if testbed is not None:
660 for name in set(attrs):
661 value = testbed.get(guid, name)
662 if isinstance(value, basestring):
663 ref_value = self.resolve_netref_value(value)
664 if ref_value is not None:
665 testbed.set(guid, name, ref_value)
667 elif fail_if_undefined:
668 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
670 del self._netrefs[(testbed_guid, guid)]
673 for testbed_guid, attrs in self._testbed_netrefs.items():
674 tb_data = dict(data.get_attribute_data(testbed_guid))
676 for name in set(attrs):
677 value = tb_data.get(name)
678 if isinstance(value, basestring):
679 ref_value = self.resolve_netref_value(value)
680 if ref_value is not None:
681 data.set_attribute_data(testbed_guid, name, ref_value)
683 elif fail_if_undefined:
684 raise ValueError, "Unresolvable netref in: %r" % (value,)
686 del self._testbed_netrefs[testbed_guid]
689 def _init_testbed_controllers(self, data, recover = False):
690 blacklist_testbeds = set(self._testbeds)
691 element_guids = list()
693 data_guids = data.guids
695 # create testbed controllers
696 for guid in data_guids:
697 if data.is_testbed_data(guid):
698 if guid not in self._testbeds:
699 self._create_testbed_controller(guid, data, element_guids,
702 (testbed_guid, factory_id) = data.get_box_data(guid)
703 if testbed_guid not in blacklist_testbeds:
704 element_guids.append(guid)
705 label = data.get_attribute_data(guid, "label")
706 if label is not None:
707 if label in label_guids:
708 raise RuntimeError, "Label %r is not unique" % (label,)
709 label_guids[label] = guid
711 # replace references to elements labels for its guid
712 self._resolve_labels(data, data_guids, label_guids)
714 # program testbed controllers
716 self._program_testbed_controllers(element_guids, data)
718 def _resolve_labels(self, data, data_guids, label_guids):
719 netrefs = self._netrefs
720 testbed_netrefs = self._testbed_netrefs
721 for guid in data_guids:
722 for name, value in data.get_attribute_data(guid):
723 if isinstance(value, basestring):
724 match = ATTRIBUTE_PATTERN_BASE.search(value)
726 label = match.group("label")
727 if not label.startswith('GUID-'):
728 ref_guid = label_guids.get(label)
729 if ref_guid is not None:
730 value = ATTRIBUTE_PATTERN_BASE.sub(
731 ATTRIBUTE_PATTERN_GUID_SUB % dict(
732 guid = 'GUID-%d' % (ref_guid,),
733 expr = match.group("expr"),
736 data.set_attribute_data(guid, name, value)
738 # memorize which guid-attribute pairs require
739 # postprocessing, to avoid excessive controller-testbed
740 # communication at configuration time
741 # (which could require high-latency network I/O)
742 if not data.is_testbed_data(guid):
743 (testbed_guid, factory_id) = data.get_box_data(guid)
744 netrefs[(testbed_guid, guid)].add(name)
746 testbed_netrefs[guid].add(name)
748 def _create_testbed_controller(self, guid, data, element_guids, recover):
749 (testbed_id, testbed_version) = data.get_testbed_data(guid)
750 deployment_config = self._deployment_config.get(guid)
752 # deferred import because proxy needs
753 # our class definitions to define proxies
754 import nepi.util.proxy as proxy
756 if deployment_config is None:
758 deployment_config = proxy.AccessConfiguration()
760 for (name, value) in data.get_attribute_data(guid):
761 if value is not None and deployment_config.has_attribute(name):
762 # if any deployment config attribute has a netref, we can't
763 # create this controller yet
764 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
765 # remember to re-issue this one
766 self._netreffed_testbeds.add(guid)
769 # copy deployment config attribute
770 deployment_config.set_attribute_value(name, value)
773 self._deployment_config[guid] = deployment_config
775 if deployment_config is not None:
776 # force recovery mode
777 deployment_config.set_attribute_value("recover",recover)
779 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
781 for (name, value) in data.get_attribute_data(guid):
782 testbed.defer_configure(name, value)
783 self._testbeds[guid] = testbed
784 if guid in self._netreffed_testbeds:
785 self._netreffed_testbeds.remove(guid)
787 def _program_testbed_controllers(self, element_guids, data):
788 def resolve_create_netref(data, guid, name, value):
789 # Try to resolve create-time netrefs, if possible
790 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
792 nuvalue = self.resolve_netref_value(value)
794 # Any trouble means we're not in shape to resolve the netref yet
796 if nuvalue is not None:
797 # Only if we succeed we remove the netref deferral entry
799 data.set_attribute_data(guid, name, value)
800 if (testbed_guid, guid) in self._netrefs:
801 self._netrefs[(testbed_guid, guid)].discard(name)
804 for guid in element_guids:
805 (testbed_guid, factory_id) = data.get_box_data(guid)
806 testbed = self._testbeds.get(testbed_guid)
807 if testbed is not None:
809 testbed.defer_create(guid, factory_id)
811 for (name, value) in data.get_attribute_data(guid):
812 value = resolve_create_netref(data, guid, name, value)
813 testbed.defer_create_set(guid, name, value)
815 for guid in element_guids:
816 (testbed_guid, factory_id) = data.get_box_data(guid)
817 testbed = self._testbeds.get(testbed_guid)
818 if testbed is not None:
820 for trace_id in data.get_trace_data(guid):
821 testbed.defer_add_trace(guid, trace_id)
823 for (address, netprefix, broadcast) in data.get_address_data(guid):
825 testbed.defer_add_address(guid, address, netprefix,
828 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
829 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
830 # store connections data
831 for (connector_type_name, other_guid, other_connector_type_name) \
832 in data.get_connection_data(guid):
833 (other_testbed_guid, other_factory_id) = data.get_box_data(
835 if testbed_guid == other_testbed_guid:
836 # each testbed should take care of enforcing internal
837 # connection simmetry, so each connection is only
838 # added in one direction
839 testbed.defer_connect(guid, connector_type_name,
840 other_guid, other_connector_type_name)
842 def _program_testbed_cross_connections(self, data):
843 data_guids = data.guids
844 for guid in data_guids:
845 if not data.is_testbed_data(guid):
846 (testbed_guid, factory_id) = data.get_box_data(guid)
847 testbed = self._testbeds.get(testbed_guid)
848 if testbed is not None:
849 for (connector_type_name, cross_guid, cross_connector_type_name) \
850 in data.get_connection_data(guid):
851 (testbed_guid, factory_id) = data.get_box_data(guid)
852 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
854 if testbed_guid != cross_testbed_guid:
855 cross_testbed = self._testbeds[cross_testbed_guid]
856 cross_testbed_id = cross_testbed.testbed_id
857 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
858 cross_testbed_guid, cross_testbed_id, cross_factory_id,
859 cross_connector_type_name)
860 # save cross data for later
861 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
864 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
865 if testbed_guid not in self._cross_data:
866 self._cross_data[testbed_guid] = dict()
867 if cross_testbed_guid not in self._cross_data[testbed_guid]:
868 self._cross_data[testbed_guid][cross_testbed_guid] = set()
869 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
871 def _get_cross_data(self, testbed_guid):
873 if not testbed_guid in self._cross_data:
876 # fetch attribute lists in one batch
877 attribute_lists = dict()
878 for cross_testbed_guid, guid_list in \
879 self._cross_data[testbed_guid].iteritems():
880 cross_testbed = self._testbeds[cross_testbed_guid]
881 for cross_guid in guid_list:
882 attribute_lists[(cross_testbed_guid, cross_guid)] = \
883 cross_testbed.get_attribute_list_deferred(cross_guid)
885 # fetch attribute values in another batch
886 for cross_testbed_guid, guid_list in \
887 self._cross_data[testbed_guid].iteritems():
888 cross_data[cross_testbed_guid] = dict()
889 cross_testbed = self._testbeds[cross_testbed_guid]
890 for cross_guid in guid_list:
891 elem_cross_data = dict(
893 _testbed_guid = cross_testbed_guid,
894 _testbed_id = cross_testbed.testbed_id,
895 _testbed_version = cross_testbed.testbed_version)
896 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
897 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
898 for attr_name in attribute_list:
899 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
900 elem_cross_data[attr_name] = attr_value
902 # undefer all values - we'll have to serialize them probably later
903 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
904 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
905 for attr_name, attr_value in elem_cross_data.iteritems():
906 elem_cross_data[attr_name] = _undefer(attr_value)