2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
20 def _undefer(deferred):
21 if hasattr(deferred, '_get'):
22 return deferred._get()
27 class TestbedController(object):
28 def __init__(self, testbed_id, testbed_version):
29 self._testbed_id = testbed_id
30 self._testbed_version = testbed_version
34 return self._testbed_id
37 def testbed_version(self):
38 return self._testbed_version
42 raise NotImplementedError
44 def defer_configure(self, name, value):
45 """Instructs setting a configuartion attribute for the testbed instance"""
46 raise NotImplementedError
48 def defer_create(self, guid, factory_id):
49 """Instructs creation of element """
50 raise NotImplementedError
52 def defer_create_set(self, guid, name, value):
53 """Instructs setting an initial attribute on an element"""
54 raise NotImplementedError
56 def defer_factory_set(self, guid, name, value):
57 """Instructs setting an attribute on a factory"""
58 raise NotImplementedError
60 def defer_connect(self, guid1, connector_type_name1, guid2,
61 connector_type_name2):
62 """Instructs creation of a connection between the given connectors"""
63 raise NotImplementedError
65 def defer_cross_connect(self,
66 guid, connector_type_name,
67 cross_guid, cross_testbed_guid,
68 cross_testbed_id, cross_factory_id,
69 cross_connector_type_name):
71 Instructs creation of a connection between the given connectors
72 of different testbed instances
74 raise NotImplementedError
76 def defer_add_trace(self, guid, trace_id):
77 """Instructs the addition of a trace"""
78 raise NotImplementedError
80 def defer_add_address(self, guid, address, netprefix, broadcast):
81 """Instructs the addition of an address"""
82 raise NotImplementedError
84 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
85 """Instructs the addition of a route"""
86 raise NotImplementedError
89 """After do_setup the testbed initial configuration is done"""
90 raise NotImplementedError
94 After do_create all instructed elements are created and
97 raise NotImplementedError
99 def do_connect_init(self):
101 After do_connect_init all internal connections between testbed elements
104 raise NotImplementedError
106 def do_connect_compl(self):
108 After do_connect all internal connections between testbed elements
111 raise NotImplementedError
113 def do_preconfigure(self):
115 Done just before resolving netrefs, after connection, before cross connections,
116 useful for early stages of configuration, for setting up stuff that might be
117 required for netref resolution.
119 raise NotImplementedError
121 def do_configure(self):
122 """After do_configure elements are configured"""
123 raise NotImplementedError
125 def do_prestart(self):
126 """Before do_start elements are prestart-configured"""
127 raise NotImplementedError
129 def do_cross_connect_init(self, cross_data):
131 After do_cross_connect_init initiation of all external connections
132 between different testbed elements is performed
134 raise NotImplementedError
136 def do_cross_connect_compl(self, cross_data):
138 After do_cross_connect_compl completion of all external connections
139 between different testbed elements is performed
141 raise NotImplementedError
144 raise NotImplementedError
147 raise NotImplementedError
149 def set(self, guid, name, value, time = TIME_NOW):
150 raise NotImplementedError
152 def get(self, guid, name, time = TIME_NOW):
153 raise NotImplementedError
155 def get_route(self, guid, index, attribute):
159 guid: guid of box to query
160 index: number of routing entry to fetch
161 attribute: one of Destination, NextHop, NetPrefix
163 raise NotImplementedError
165 def get_address(self, guid, index, attribute='Address'):
169 guid: guid of box to query
170 index: number of inteface to select
171 attribute: one of Address, NetPrefix, Broadcast
173 raise NotImplementedError
175 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
176 raise NotImplementedError
178 def get_factory_id(self, guid):
179 raise NotImplementedError
181 def action(self, time, guid, action):
182 raise NotImplementedError
184 def status(self, guid):
185 raise NotImplementedError
187 def trace(self, guid, trace_id, attribute='value'):
188 raise NotImplementedError
190 def traces_info(self):
191 """ dictionary of dictionaries:
197 filesize = size in bytes,
201 raise NotImplementedError
204 raise NotImplementedError
206 class ExperimentController(object):
207 def __init__(self, experiment_xml, root_dir):
208 self._experiment_design_xml = experiment_xml
209 self._experiment_execute_xml = None
210 self._testbeds = dict()
211 self._deployment_config = dict()
212 self._netrefs = collections.defaultdict(set)
213 self._testbed_netrefs = collections.defaultdict(set)
214 self._cross_data = dict()
215 self._root_dir = root_dir
216 self._netreffed_testbeds = set()
217 self._guids_in_testbed_cache = dict()
219 self.persist_experiment_xml()
222 def experiment_design_xml(self):
223 return self._experiment_design_xml
226 def experiment_execute_xml(self):
227 return self._experiment_execute_xml
232 for testbed_guid in self._testbeds.keys():
233 _guids = self._guids_in_testbed(testbed_guid)
238 def persist_experiment_xml(self):
239 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
240 f = open(xml_path, "w")
241 f.write(self._experiment_design_xml)
244 def persist_execute_xml(self):
245 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
246 f = open(xml_path, "w")
247 f.write(self._experiment_execute_xml)
250 def trace(self, guid, trace_id, attribute='value'):
251 testbed = self._testbed_for_guid(guid)
253 return testbed.trace(guid, trace_id, attribute)
254 raise RuntimeError("No element exists with guid %d" % guid)
256 def traces_info(self):
258 for guid, testbed in self._testbeds.iteritems():
259 tinfo = testbed.traces_info()
261 traces_info[guid] = testbed.traces_info()
265 def _parallel(callables):
268 @functools.wraps(callable)
269 def wrapped(*p, **kw):
274 traceback.print_exc(file=sys.stderr)
275 excs.append(sys.exc_info())
277 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
278 for thread in threads:
280 for thread in threads:
283 eTyp, eVal, eLoc = exc
284 raise eTyp, eVal, eLoc
287 parser = XmlExperimentParser()
288 data = parser.from_xml_to_data(self._experiment_design_xml)
290 # instantiate testbed controllers
291 self._init_testbed_controllers(data)
293 # persist testbed connection data, for potential recovery
294 self._persist_testbed_proxies()
296 def steps_to_configure(self, allowed_guids):
297 # perform setup in parallel for all test beds,
298 # wait for all threads to finish
299 self._parallel([testbed.do_setup
300 for guid,testbed in self._testbeds.iteritems()
301 if guid in allowed_guids])
303 # perform create-connect in parallel, wait
304 # (internal connections only)
305 self._parallel([testbed.do_create
306 for guid,testbed in self._testbeds.iteritems()
307 if guid in allowed_guids])
309 self._parallel([testbed.do_connect_init
310 for guid,testbed in self._testbeds.iteritems()
311 if guid in allowed_guids])
313 self._parallel([testbed.do_connect_compl
314 for guid,testbed in self._testbeds.iteritems()
315 if guid in allowed_guids])
317 self._parallel([testbed.do_preconfigure
318 for guid,testbed in self._testbeds.iteritems()
319 if guid in allowed_guids])
322 steps_to_configure(self, self._testbeds)
324 if self._netreffed_testbeds:
325 # initally resolve netrefs
326 self.do_netrefs(data, fail_if_undefined=False)
328 # rinse and repeat, for netreffed testbeds
329 netreffed_testbeds = set(self._netreffed_testbeds)
331 self._init_testbed_controllers(data)
333 # persist testbed connection data, for potential recovery
334 self._persist_testbed_proxies()
336 # configure dependant testbeds
337 steps_to_configure(self, netreffed_testbeds)
339 # final netref step, fail if anything's left unresolved
340 self.do_netrefs(data, fail_if_undefined=True)
342 # Only now, that netref dependencies have been solve, it is safe to
343 # program cross_connections
344 self._program_testbed_cross_connections(data)
346 # perform do_configure in parallel for al testbeds
347 # (it's internal configuration for each)
348 self._parallel([testbed.do_configure
349 for testbed in self._testbeds.itervalues()])
353 #print >>sys.stderr, "DO IT"
357 # cross-connect (cannot be done in parallel)
358 for guid, testbed in self._testbeds.iteritems():
359 cross_data = self._get_cross_data(guid)
360 testbed.do_cross_connect_init(cross_data)
361 for guid, testbed in self._testbeds.iteritems():
362 cross_data = self._get_cross_data(guid)
363 testbed.do_cross_connect_compl(cross_data)
367 # Last chance to configure (parallel on all testbeds)
368 self._parallel([testbed.do_prestart
369 for testbed in self._testbeds.itervalues()])
373 # update execution xml with execution-specific values
374 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
375 self._update_execute_xml()
376 self.persist_execute_xml()
378 # start experiment (parallel start on all testbeds)
379 self._parallel([testbed.start
380 for testbed in self._testbeds.itervalues()])
384 def _clear_caches(self):
385 # Cleaning cache for safety.
386 self._guids_in_testbed_cache = dict()
388 def _persist_testbed_proxies(self):
389 TRANSIENT = ('Recover',)
391 # persist access configuration for all testbeds, so that
392 # recovery mode can reconnect to them if it becomes necessary
393 conf = ConfigParser.RawConfigParser()
394 for testbed_guid, testbed_config in self._deployment_config.iteritems():
395 testbed_guid = str(testbed_guid)
396 conf.add_section(testbed_guid)
397 for attr in testbed_config.get_attribute_list():
398 if attr not in TRANSIENT:
399 conf.set(testbed_guid, attr,
400 testbed_config.get_attribute_value(attr))
402 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
406 def _load_testbed_proxies(self):
411 BOOLEAN : 'getboolean',
414 # deferred import because proxy needs
415 # our class definitions to define proxies
416 import nepi.util.proxy as proxy
418 conf = ConfigParser.RawConfigParser()
419 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
420 for testbed_guid in conf.sections():
421 testbed_config = proxy.AccessConfiguration()
422 for attr in conf.options(testbed_guid):
423 testbed_config.set_attribute_value(attr,
424 conf.get(testbed_guid, attr) )
426 testbed_guid = str(testbed_guid)
427 conf.add_section(testbed_guid)
428 for attr in testbed_config.get_attribute_list():
429 if attr not in TRANSIENT:
430 getter = getattr(conf, TYPEMAP.get(
431 testbed_config.get_attribute_type(attr),
433 testbed_config.set_attribute_value(
434 testbed_guid, attr, getter(attr))
436 def _unpersist_testbed_proxies(self):
438 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
440 # Just print exceptions, this is just cleanup
442 ######## BUG ##########
443 #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
444 #traceback.print_exc(file=sys.stderr)
446 def _update_execute_xml(self):
448 # For all elements in testbed,
449 # - gather immutable execute-readable attribuets lists
451 # Generate new design description from design xml
452 # (Wait for attributes lists - implicit syncpoint)
454 # For all elements in testbed,
455 # - gather all immutable execute-readable attribute
456 # values, asynchronously
457 # (Wait for attribute values - implicit syncpoint)
459 # For all elements in testbed,
460 # - inject non-None values into new design
461 # Generate execute xml from new design
463 attribute_lists = dict(
464 (testbed_guid, collections.defaultdict(dict))
465 for testbed_guid in self._testbeds
468 for testbed_guid, testbed in self._testbeds.iteritems():
469 guids = self._guids_in_testbed(testbed_guid)
471 attribute_lists[testbed_guid][guid] = \
472 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
474 parser = XmlExperimentParser()
475 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
477 attribute_values = dict(
478 (testbed_guid, collections.defaultdict(dict))
479 for testbed_guid in self._testbeds
482 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
483 testbed = self._testbeds[testbed_guid]
484 for guid, attribute_list in testbed_attribute_lists.iteritems():
485 attribute_list = _undefer(attribute_list)
486 attribute_values[testbed_guid][guid] = dict(
487 (attribute, testbed.get_deferred(guid, attribute))
488 for attribute in attribute_list
491 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
492 for guid, attribute_values in testbed_attribute_values.iteritems():
493 for attribute, value in attribute_values.iteritems():
494 value = _undefer(value)
495 if value is not None:
496 execute_data.add_attribute_data(guid, attribute, value)
498 self._experiment_execute_xml = parser.to_xml(data=execute_data)
501 for testbed in self._testbeds.values():
503 self._unpersist_testbed_proxies()
506 # reload perviously persisted testbed access configurations
507 self._load_testbed_proxies()
509 # recreate testbed proxies by reconnecting only
510 self._init_testbed_controllers(recover = True)
512 # another time, for netrefs
513 self._init_testbed_controllers(recover = True)
515 def is_finished(self, guid):
516 testbed = self._testbed_for_guid(guid)
518 return testbed.status(guid) == AS.STATUS_FINISHED
519 raise RuntimeError("No element exists with guid %d" % guid)
521 def set(self, guid, name, value, time = TIME_NOW):
522 testbed = self._testbed_for_guid(guid)
524 testbed.set(guid, name, value, time)
526 raise RuntimeError("No element exists with guid %d" % guid)
528 def get(self, guid, name, time = TIME_NOW):
529 testbed = self._testbed_for_guid(guid)
531 return testbed.get(guid, name, time)
532 raise RuntimeError("No element exists with guid %d" % guid)
534 def get_deferred(self, guid, name, time = TIME_NOW):
535 testbed = self._testbed_for_guid(guid)
537 return testbed.get_deferred(guid, name, time)
538 raise RuntimeError("No element exists with guid %d" % guid)
540 def get_factory_id(self, guid):
541 testbed = self._testbed_for_guid(guid)
543 return testbed.get_factory_id(guid)
544 raise RuntimeError("No element exists with guid %d" % guid)
546 def get_testbed_id(self, guid):
547 testbed = self._testbed_for_guid(guid)
549 return testbed.testbed_id
550 raise RuntimeError("No element exists with guid %d" % guid)
552 def get_testbed_version(self, guid):
553 testbed = self._testbed_for_guid(guid)
555 return testbed.testbed_version
556 raise RuntimeError("No element exists with guid %d" % guid)
560 for testbed in self._testbeds.values():
564 exceptions.append(sys.exc_info())
565 for exc_info in exceptions:
566 raise exc_info[0], exc_info[1], exc_info[2]
568 def _testbed_for_guid(self, guid):
569 for testbed_guid in self._testbeds.keys():
570 if guid in self._guids_in_testbed(testbed_guid):
571 return self._testbeds[testbed_guid]
574 def _guids_in_testbed(self, testbed_guid):
575 if testbed_guid not in self._testbeds:
577 if testbed_guid not in self._guids_in_testbed_cache:
578 self._guids_in_testbed_cache[testbed_guid] = \
579 set(self._testbeds[testbed_guid].guids)
580 return self._guids_in_testbed_cache[testbed_guid]
583 def _netref_component_split(component):
584 match = COMPONENT_PATTERN.match(component)
586 return match.group("kind"), match.group("index")
588 return component, None
590 _NETREF_COMPONENT_GETTERS = {
592 lambda testbed, guid, index, name:
593 testbed.get_address(guid, int(index), name),
595 lambda testbed, guid, index, name:
596 testbed.get_route(guid, int(index), name),
598 lambda testbed, guid, index, name:
599 testbed.trace(guid, index, name),
601 lambda testbed, guid, index, name:
602 testbed.get(guid, name),
605 def resolve_netref_value(self, value, failval = None):
606 match = ATTRIBUTE_PATTERN_BASE.search(value)
608 label = match.group("label")
609 if label.startswith('GUID-'):
610 ref_guid = int(label[5:])
612 expr = match.group("expr")
613 component = (match.group("component") or "")[1:] # skip the dot
614 attribute = match.group("attribute")
616 # split compound components into component kind and index
617 # eg: 'addr[0]' -> ('addr', '0')
618 component, component_index = self._netref_component_split(component)
620 # find object and resolve expression
621 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
622 if component not in self._NETREF_COMPONENT_GETTERS:
623 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
624 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
627 ref_value = self._NETREF_COMPONENT_GETTERS[component](
628 ref_testbed, ref_guid, component_index, attribute)
630 return value.replace(match.group(), ref_value)
631 # couldn't find value
634 def do_netrefs(self, data, fail_if_undefined = False):
636 for (testbed_guid, guid), attrs in self._netrefs.items():
637 testbed = self._testbeds.get(testbed_guid)
638 if testbed is not None:
639 for name in set(attrs):
640 value = testbed.get(guid, name)
641 if isinstance(value, basestring):
642 ref_value = self.resolve_netref_value(value)
643 if ref_value is not None:
644 testbed.set(guid, name, ref_value)
646 elif fail_if_undefined:
647 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
649 del self._netrefs[(testbed_guid, guid)]
652 for testbed_guid, attrs in self._testbed_netrefs.items():
653 tb_data = dict(data.get_attribute_data(testbed_guid))
655 for name in set(attrs):
656 value = tb_data.get(name)
657 if isinstance(value, basestring):
658 ref_value = self.resolve_netref_value(value)
659 if ref_value is not None:
660 data.set_attribute_data(testbed_guid, name, ref_value)
662 elif fail_if_undefined:
663 raise ValueError, "Unresolvable netref in: %r" % (value,)
665 del self._testbed_netrefs[testbed_guid]
668 def _init_testbed_controllers(self, data, recover = False):
669 blacklist_testbeds = set(self._testbeds)
670 element_guids = list()
672 data_guids = data.guids
674 # create testbed controllers
675 for guid in data_guids:
676 if data.is_testbed_data(guid):
677 if guid not in self._testbeds:
678 self._create_testbed_controller(guid, data, element_guids,
681 (testbed_guid, factory_id) = data.get_box_data(guid)
682 if testbed_guid not in blacklist_testbeds:
683 element_guids.append(guid)
684 label = data.get_attribute_data(guid, "label")
685 if label is not None:
686 if label in label_guids:
687 raise RuntimeError, "Label %r is not unique" % (label,)
688 label_guids[label] = guid
690 # replace references to elements labels for its guid
691 self._resolve_labels(data, data_guids, label_guids)
693 # program testbed controllers
695 self._program_testbed_controllers(element_guids, data)
697 def _resolve_labels(self, data, data_guids, label_guids):
698 netrefs = self._netrefs
699 testbed_netrefs = self._testbed_netrefs
700 for guid in data_guids:
701 for name, value in data.get_attribute_data(guid):
702 if isinstance(value, basestring):
703 match = ATTRIBUTE_PATTERN_BASE.search(value)
705 label = match.group("label")
706 if not label.startswith('GUID-'):
707 ref_guid = label_guids.get(label)
708 if ref_guid is not None:
709 value = ATTRIBUTE_PATTERN_BASE.sub(
710 ATTRIBUTE_PATTERN_GUID_SUB % dict(
711 guid = 'GUID-%d' % (ref_guid,),
712 expr = match.group("expr"),
715 data.set_attribute_data(guid, name, value)
717 # memorize which guid-attribute pairs require
718 # postprocessing, to avoid excessive controller-testbed
719 # communication at configuration time
720 # (which could require high-latency network I/O)
721 if not data.is_testbed_data(guid):
722 (testbed_guid, factory_id) = data.get_box_data(guid)
723 netrefs[(testbed_guid, guid)].add(name)
725 testbed_netrefs[guid].add(name)
727 def _create_testbed_controller(self, guid, data, element_guids, recover):
728 (testbed_id, testbed_version) = data.get_testbed_data(guid)
729 deployment_config = self._deployment_config.get(guid)
731 # deferred import because proxy needs
732 # our class definitions to define proxies
733 import nepi.util.proxy as proxy
735 if deployment_config is None:
737 deployment_config = proxy.AccessConfiguration()
739 for (name, value) in data.get_attribute_data(guid):
740 if value is not None and deployment_config.has_attribute(name):
741 # if any deployment config attribute has a netref, we can't
742 # create this controller yet
743 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
744 # remember to re-issue this one
745 self._netreffed_testbeds.add(guid)
748 # copy deployment config attribute
749 deployment_config.set_attribute_value(name, value)
752 self._deployment_config[guid] = deployment_config
754 if deployment_config is not None:
755 # force recovery mode
756 deployment_config.set_attribute_value("recover",recover)
758 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
760 for (name, value) in data.get_attribute_data(guid):
761 testbed.defer_configure(name, value)
762 self._testbeds[guid] = testbed
763 if guid in self._netreffed_testbeds:
764 self._netreffed_testbeds.remove(guid)
766 def _program_testbed_controllers(self, element_guids, data):
767 def resolve_create_netref(data, guid, name, value):
768 # Try to resolve create-time netrefs, if possible
769 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
771 nuvalue = self.resolve_netref_value(value)
773 # Any trouble means we're not in shape to resolve the netref yet
775 if nuvalue is not None:
776 # Only if we succeed we remove the netref deferral entry
778 data.set_attribute_data(guid, name, value)
779 if (testbed_guid, guid) in self._netrefs:
780 self._netrefs[(testbed_guid, guid)].discard(name)
783 for guid in element_guids:
784 (testbed_guid, factory_id) = data.get_box_data(guid)
785 testbed = self._testbeds.get(testbed_guid)
786 if testbed is not None:
788 testbed.defer_create(guid, factory_id)
790 for (name, value) in data.get_attribute_data(guid):
791 value = resolve_create_netref(data, guid, name, value)
792 testbed.defer_create_set(guid, name, value)
794 for guid in element_guids:
795 (testbed_guid, factory_id) = data.get_box_data(guid)
796 testbed = self._testbeds.get(testbed_guid)
797 if testbed is not None:
799 for trace_id in data.get_trace_data(guid):
800 testbed.defer_add_trace(guid, trace_id)
802 for (address, netprefix, broadcast) in data.get_address_data(guid):
804 testbed.defer_add_address(guid, address, netprefix,
807 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
808 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
809 # store connections data
810 for (connector_type_name, other_guid, other_connector_type_name) \
811 in data.get_connection_data(guid):
812 (other_testbed_guid, other_factory_id) = data.get_box_data(
814 if testbed_guid == other_testbed_guid:
815 # each testbed should take care of enforcing internal
816 # connection simmetry, so each connection is only
817 # added in one direction
818 testbed.defer_connect(guid, connector_type_name,
819 other_guid, other_connector_type_name)
821 def _program_testbed_cross_connections(self, data):
822 data_guids = data.guids
823 for guid in data_guids:
824 if not data.is_testbed_data(guid):
825 (testbed_guid, factory_id) = data.get_box_data(guid)
826 testbed = self._testbeds.get(testbed_guid)
827 if testbed is not None:
828 for (connector_type_name, cross_guid, cross_connector_type_name) \
829 in data.get_connection_data(guid):
830 (testbed_guid, factory_id) = data.get_box_data(guid)
831 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
833 if testbed_guid != cross_testbed_guid:
834 cross_testbed = self._testbeds[cross_testbed_guid]
835 cross_testbed_id = cross_testbed.testbed_id
836 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
837 cross_testbed_guid, cross_testbed_id, cross_factory_id,
838 cross_connector_type_name)
839 # save cross data for later
840 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
843 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
844 if testbed_guid not in self._cross_data:
845 self._cross_data[testbed_guid] = dict()
846 if cross_testbed_guid not in self._cross_data[testbed_guid]:
847 self._cross_data[testbed_guid][cross_testbed_guid] = set()
848 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
850 def _get_cross_data(self, testbed_guid):
852 if not testbed_guid in self._cross_data:
855 # fetch attribute lists in one batch
856 attribute_lists = dict()
857 for cross_testbed_guid, guid_list in \
858 self._cross_data[testbed_guid].iteritems():
859 cross_testbed = self._testbeds[cross_testbed_guid]
860 for cross_guid in guid_list:
861 attribute_lists[(cross_testbed_guid, cross_guid)] = \
862 cross_testbed.get_attribute_list_deferred(cross_guid)
864 # fetch attribute values in another batch
865 for cross_testbed_guid, guid_list in \
866 self._cross_data[testbed_guid].iteritems():
867 cross_data[cross_testbed_guid] = dict()
868 cross_testbed = self._testbeds[cross_testbed_guid]
869 for cross_guid in guid_list:
870 elem_cross_data = dict(
872 _testbed_guid = cross_testbed_guid,
873 _testbed_id = cross_testbed.testbed_id,
874 _testbed_version = cross_testbed.testbed_version)
875 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
876 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
877 for attr_name in attribute_list:
878 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
879 elem_cross_data[attr_name] = attr_value
881 # undefer all values - we'll have to serialize them probably later
882 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
883 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
884 for attr_name, attr_value in elem_cross_data.iteritems():
885 elem_cross_data[attr_name] = _undefer(attr_value)