2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
20 def _undefer(deferred):
21 if hasattr(deferred, '_get'):
22 return deferred._get()
27 class TestbedController(object):
28 def __init__(self, testbed_id, testbed_version):
29 self._testbed_id = testbed_id
30 self._testbed_version = testbed_version
34 return self._testbed_id
37 def testbed_version(self):
38 return self._testbed_version
42 raise NotImplementedError
44 def defer_configure(self, name, value):
45 """Instructs setting a configuartion attribute for the testbed instance"""
46 raise NotImplementedError
48 def defer_create(self, guid, factory_id):
49 """Instructs creation of element """
50 raise NotImplementedError
52 def defer_create_set(self, guid, name, value):
53 """Instructs setting an initial attribute on an element"""
54 raise NotImplementedError
56 def defer_factory_set(self, guid, name, value):
57 """Instructs setting an attribute on a factory"""
58 raise NotImplementedError
60 def defer_connect(self, guid1, connector_type_name1, guid2,
61 connector_type_name2):
62 """Instructs creation of a connection between the given connectors"""
63 raise NotImplementedError
65 def defer_cross_connect(self,
66 guid, connector_type_name,
67 cross_guid, cross_testbed_guid,
68 cross_testbed_id, cross_factory_id,
69 cross_connector_type_name):
71 Instructs creation of a connection between the given connectors
72 of different testbed instances
74 raise NotImplementedError
76 def defer_add_trace(self, guid, trace_id):
77 """Instructs the addition of a trace"""
78 raise NotImplementedError
80 def defer_add_address(self, guid, address, netprefix, broadcast):
81 """Instructs the addition of an address"""
82 raise NotImplementedError
84 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
85 """Instructs the addition of a route"""
86 raise NotImplementedError
89 """After do_setup the testbed initial configuration is done"""
90 raise NotImplementedError
94 After do_create all instructed elements are created and
97 raise NotImplementedError
99 def do_connect_init(self):
101 After do_connect_init all internal connections between testbed elements
104 raise NotImplementedError
106 def do_connect_compl(self):
108 After do_connect all internal connections between testbed elements
111 raise NotImplementedError
113 def do_preconfigure(self):
115 Done just before resolving netrefs, after connection, before cross connections,
116 useful for early stages of configuration, for setting up stuff that might be
117 required for netref resolution.
119 raise NotImplementedError
121 def do_configure(self):
122 """After do_configure elements are configured"""
123 raise NotImplementedError
125 def do_prestart(self):
126 """Before do_start elements are prestart-configured"""
127 raise NotImplementedError
129 def do_cross_connect_init(self, cross_data):
131 After do_cross_connect_init initiation of all external connections
132 between different testbed elements is performed
134 raise NotImplementedError
136 def do_cross_connect_compl(self, cross_data):
138 After do_cross_connect_compl completion of all external connections
139 between different testbed elements is performed
141 raise NotImplementedError
144 raise NotImplementedError
147 raise NotImplementedError
151 On testbed recovery (if recovery is a supported policy), the controller
152 instance will be re-created and the following sequence invoked:
155 defer_X - programming the testbed with persisted execution values
156 (not design values). Execution values (ExecImmutable attributes)
157 should be enough to recreate the testbed's state.
159 <cross-connection methods>
161 Start will not be called, and after cross connection invocations,
162 the testbed is supposed to be fully functional again.
164 raise NotImplementedError
166 def set(self, guid, name, value, time = TIME_NOW):
167 raise NotImplementedError
169 def get(self, guid, name, time = TIME_NOW):
170 raise NotImplementedError
172 def get_route(self, guid, index, attribute):
176 guid: guid of box to query
177 index: number of routing entry to fetch
178 attribute: one of Destination, NextHop, NetPrefix
180 raise NotImplementedError
182 def get_address(self, guid, index, attribute='Address'):
186 guid: guid of box to query
187 index: number of inteface to select
188 attribute: one of Address, NetPrefix, Broadcast
190 raise NotImplementedError
192 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
193 raise NotImplementedError
195 def get_factory_id(self, guid):
196 raise NotImplementedError
198 def action(self, time, guid, action):
199 raise NotImplementedError
201 def status(self, guid):
202 raise NotImplementedError
204 def testbed_status(self):
205 raise NotImplementedError
207 def trace(self, guid, trace_id, attribute='value'):
208 raise NotImplementedError
210 def traces_info(self):
211 """ dictionary of dictionaries:
217 filesize = size in bytes,
221 raise NotImplementedError
224 raise NotImplementedError
226 class ExperimentController(object):
227 def __init__(self, experiment_xml, root_dir):
228 self._experiment_design_xml = experiment_xml
229 self._experiment_execute_xml = None
230 self._testbeds = dict()
231 self._deployment_config = dict()
232 self._netrefs = collections.defaultdict(set)
233 self._testbed_netrefs = collections.defaultdict(set)
234 self._cross_data = dict()
235 self._root_dir = root_dir
236 self._netreffed_testbeds = set()
237 self._guids_in_testbed_cache = dict()
238 self._failed_testbeds = set()
240 if experiment_xml is None and root_dir is not None:
242 self.load_experiment_xml()
243 self.load_execute_xml()
245 self.persist_experiment_xml()
248 def experiment_design_xml(self):
249 return self._experiment_design_xml
252 def experiment_execute_xml(self):
253 return self._experiment_execute_xml
258 for testbed_guid in self._testbeds.keys():
259 _guids = self._guids_in_testbed(testbed_guid)
264 def persist_experiment_xml(self):
265 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
266 f = open(xml_path, "w")
267 f.write(self._experiment_design_xml)
270 def persist_execute_xml(self):
271 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
272 f = open(xml_path, "w")
273 f.write(self._experiment_execute_xml)
276 def load_experiment_xml(self):
277 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
278 f = open(xml_path, "r")
279 self._experiment_design_xml = f.read()
282 def load_execute_xml(self):
283 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
284 f = open(xml_path, "r")
285 self._experiment_execute_xml = f.read()
288 def trace(self, guid, trace_id, attribute='value'):
289 testbed = self._testbed_for_guid(guid)
291 return testbed.trace(guid, trace_id, attribute)
292 raise RuntimeError("No element exists with guid %d" % guid)
294 def traces_info(self):
296 for guid, testbed in self._testbeds.iteritems():
297 tinfo = testbed.traces_info()
299 traces_info[guid] = testbed.traces_info()
303 def _parallel(callables):
306 @functools.wraps(callable)
307 def wrapped(*p, **kw):
312 traceback.print_exc(file=sys.stderr)
313 excs.append(sys.exc_info())
315 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
316 for thread in threads:
318 for thread in threads:
321 eTyp, eVal, eLoc = exc
322 raise eTyp, eVal, eLoc
327 def _start(self, recover = False):
328 parser = XmlExperimentParser()
331 xml = self._experiment_execute_xml
333 xml = self._experiment_design_xml
334 data = parser.from_xml_to_data(xml)
336 # instantiate testbed controllers
337 to_recover, to_restart = self._init_testbed_controllers(data, recover)
338 all_restart = set(to_restart)
341 # persist testbed connection data, for potential recovery
342 self._persist_testbed_proxies()
344 # recover recoverable controllers
345 for guid in to_recover:
347 self._testbeds[guid].do_setup()
348 self._testbeds[guid].recover()
351 self._failed_testbeds.add(guid)
353 def steps_to_configure(self, allowed_guids):
354 # perform setup in parallel for all test beds,
355 # wait for all threads to finish
356 self._parallel([testbed.do_setup
357 for guid,testbed in self._testbeds.iteritems()
358 if guid in allowed_guids])
360 # perform create-connect in parallel, wait
361 # (internal connections only)
362 self._parallel([testbed.do_create
363 for guid,testbed in self._testbeds.iteritems()
364 if guid in allowed_guids])
366 self._parallel([testbed.do_connect_init
367 for guid,testbed in self._testbeds.iteritems()
368 if guid in allowed_guids])
370 self._parallel([testbed.do_connect_compl
371 for guid,testbed in self._testbeds.iteritems()
372 if guid in allowed_guids])
374 self._parallel([testbed.do_preconfigure
375 for guid,testbed in self._testbeds.iteritems()
376 if guid in allowed_guids])
379 steps_to_configure(self, to_restart)
381 if self._netreffed_testbeds:
382 # initally resolve netrefs
383 self.do_netrefs(data, fail_if_undefined=False)
385 # rinse and repeat, for netreffed testbeds
386 netreffed_testbeds = set(self._netreffed_testbeds)
388 to_recover, to_restart = self._init_testbed_controllers(data, recover)
389 all_restart.update(to_restart)
392 # persist testbed connection data, for potential recovery
393 self._persist_testbed_proxies()
395 # recover recoverable controllers
396 for guid in to_recover:
398 self._testbeds[guid].do_setup()
399 self._testbeds[guid].recover()
402 self._failed_testbeds.add(guid)
404 # configure dependant testbeds
405 steps_to_configure(self, to_restart)
407 all_restart = [ self._testbeds[guid] for guid in all_restart ]
409 # final netref step, fail if anything's left unresolved
410 self.do_netrefs(data, fail_if_undefined=True)
412 # Only now, that netref dependencies have been solve, it is safe to
413 # program cross_connections
414 self._program_testbed_cross_connections(data)
416 # perform do_configure in parallel for al testbeds
417 # (it's internal configuration for each)
418 self._parallel([testbed.do_configure
419 for testbed in all_restart])
423 #print >>sys.stderr, "DO IT"
427 # cross-connect (cannot be done in parallel)
428 for guid, testbed in self._testbeds.iteritems():
429 cross_data = self._get_cross_data(guid)
430 testbed.do_cross_connect_init(cross_data)
431 for guid, testbed in self._testbeds.iteritems():
432 cross_data = self._get_cross_data(guid)
433 testbed.do_cross_connect_compl(cross_data)
437 # Last chance to configure (parallel on all testbeds)
438 self._parallel([testbed.do_prestart
439 for testbed in all_restart])
444 # update execution xml with execution-specific values
445 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
446 self._update_execute_xml()
447 self.persist_execute_xml()
449 # start experiment (parallel start on all testbeds)
450 self._parallel([testbed.start
451 for testbed in all_restart])
455 def _clear_caches(self):
456 # Cleaning cache for safety.
457 self._guids_in_testbed_cache = dict()
459 def _persist_testbed_proxies(self):
460 TRANSIENT = (DC.RECOVER,)
462 # persist access configuration for all testbeds, so that
463 # recovery mode can reconnect to them if it becomes necessary
464 conf = ConfigParser.RawConfigParser()
465 for testbed_guid, testbed_config in self._deployment_config.iteritems():
466 testbed_guid = str(testbed_guid)
467 conf.add_section(testbed_guid)
468 for attr in testbed_config.get_attribute_list():
469 if attr not in TRANSIENT:
470 conf.set(testbed_guid, attr,
471 testbed_config.get_attribute_value(attr))
473 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
477 def _load_testbed_proxies(self):
479 Attribute.STRING : 'get',
480 Attribute.BOOL : 'getboolean',
481 Attribute.ENUM : 'get',
482 Attribute.DOUBLE : 'getfloat',
483 Attribute.INTEGER : 'getint',
486 TRANSIENT = (DC.RECOVER,)
488 # deferred import because proxy needs
489 # our class definitions to define proxies
490 import nepi.util.proxy as proxy
492 conf = ConfigParser.RawConfigParser()
493 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
494 for testbed_guid in conf.sections():
495 testbed_config = proxy.AccessConfiguration()
496 testbed_guid = str(testbed_guid)
497 for attr in testbed_config.get_attribute_list():
498 if attr not in TRANSIENT:
499 getter = getattr(conf, TYPEMAP.get(
500 testbed_config.get_attribute_type(attr),
502 testbed_config.set_attribute_value(
503 attr, getter(testbed_guid, attr))
505 def _unpersist_testbed_proxies(self):
507 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
509 # Just print exceptions, this is just cleanup
511 ######## BUG ##########
512 #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
513 #traceback.print_exc(file=sys.stderr)
515 def _update_execute_xml(self):
517 # For all elements in testbed,
518 # - gather immutable execute-readable attribuets lists
520 # Generate new design description from design xml
521 # (Wait for attributes lists - implicit syncpoint)
523 # For all elements in testbed,
524 # - gather all immutable execute-readable attribute
525 # values, asynchronously
526 # (Wait for attribute values - implicit syncpoint)
528 # For all elements in testbed,
529 # - inject non-None values into new design
530 # Generate execute xml from new design
532 attribute_lists = dict(
533 (testbed_guid, collections.defaultdict(dict))
534 for testbed_guid in self._testbeds
537 for testbed_guid, testbed in self._testbeds.iteritems():
538 guids = self._guids_in_testbed(testbed_guid)
540 attribute_lists[testbed_guid][guid] = \
541 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
543 parser = XmlExperimentParser()
544 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
546 attribute_values = dict(
547 (testbed_guid, collections.defaultdict(dict))
548 for testbed_guid in self._testbeds
551 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
552 testbed = self._testbeds[testbed_guid]
553 for guid, attribute_list in testbed_attribute_lists.iteritems():
554 attribute_list = _undefer(attribute_list)
555 attribute_values[testbed_guid][guid] = dict(
556 (attribute, testbed.get_deferred(guid, attribute))
557 for attribute in attribute_list
560 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
561 for guid, attribute_values in testbed_attribute_values.iteritems():
562 for attribute, value in attribute_values.iteritems():
563 value = _undefer(value)
564 if value is not None:
565 execute_data.add_attribute_data(guid, attribute, value)
567 self._experiment_execute_xml = parser.to_xml(data=execute_data)
570 for testbed in self._testbeds.values():
572 self._unpersist_testbed_proxies()
575 # reload perviously persisted testbed access configurations
576 self._failed_testbeds.clear()
577 self._load_testbed_proxies()
579 # re-program testbeds that need recovery
580 self._start(recover = True)
582 def is_finished(self, guid):
583 testbed = self._testbed_for_guid(guid)
585 return testbed.status(guid) == AS.STATUS_FINISHED
586 raise RuntimeError("No element exists with guid %d" % guid)
588 def _testbed_recovery_policy(self, guid, data = None):
590 parser = XmlExperimentParser()
591 data = parser.from_xml_to_data(self._experiment_design_xml)
593 return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
595 def status(self, guid):
596 if guid in self._testbeds:
598 # report testbed status
599 if guid in self._failed_testbeds:
600 return TS.STATUS_FAILED
603 return self._testbeds[guid].status()
605 return TS.STATUS_UNRESPONSIVE
608 testbed = self._testbed_for_guid(guid)
609 if testbed is not None:
610 return testbed.status(guid)
612 return AS.STATUS_UNDETERMINED
614 def set(self, guid, name, value, time = TIME_NOW):
615 testbed = self._testbed_for_guid(guid)
617 testbed.set(guid, name, value, time)
619 raise RuntimeError("No element exists with guid %d" % guid)
621 def get(self, guid, name, time = TIME_NOW):
622 testbed = self._testbed_for_guid(guid)
624 return testbed.get(guid, name, time)
625 raise RuntimeError("No element exists with guid %d" % guid)
627 def get_deferred(self, guid, name, time = TIME_NOW):
628 testbed = self._testbed_for_guid(guid)
630 return testbed.get_deferred(guid, name, time)
631 raise RuntimeError("No element exists with guid %d" % guid)
633 def get_factory_id(self, guid):
634 testbed = self._testbed_for_guid(guid)
636 return testbed.get_factory_id(guid)
637 raise RuntimeError("No element exists with guid %d" % guid)
639 def get_testbed_id(self, guid):
640 testbed = self._testbed_for_guid(guid)
642 return testbed.testbed_id
643 raise RuntimeError("No element exists with guid %d" % guid)
645 def get_testbed_version(self, guid):
646 testbed = self._testbed_for_guid(guid)
648 return testbed.testbed_version
649 raise RuntimeError("No element exists with guid %d" % guid)
653 for testbed in self._testbeds.values():
657 exceptions.append(sys.exc_info())
658 for exc_info in exceptions:
659 raise exc_info[0], exc_info[1], exc_info[2]
661 def _testbed_for_guid(self, guid):
662 for testbed_guid in self._testbeds.keys():
663 if guid in self._guids_in_testbed(testbed_guid):
664 if testbed_guid in self._failed_testbeds:
666 return self._testbeds[testbed_guid]
669 def _guids_in_testbed(self, testbed_guid):
670 if testbed_guid not in self._testbeds:
672 if testbed_guid not in self._guids_in_testbed_cache:
673 self._guids_in_testbed_cache[testbed_guid] = \
674 set(self._testbeds[testbed_guid].guids)
675 return self._guids_in_testbed_cache[testbed_guid]
678 def _netref_component_split(component):
679 match = COMPONENT_PATTERN.match(component)
681 return match.group("kind"), match.group("index")
683 return component, None
685 _NETREF_COMPONENT_GETTERS = {
687 lambda testbed, guid, index, name:
688 testbed.get_address(guid, int(index), name),
690 lambda testbed, guid, index, name:
691 testbed.get_route(guid, int(index), name),
693 lambda testbed, guid, index, name:
694 testbed.trace(guid, index, name),
696 lambda testbed, guid, index, name:
697 testbed.get(guid, name),
700 def resolve_netref_value(self, value, failval = None):
703 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
704 label = match.group("label")
705 if label.startswith('GUID-'):
706 ref_guid = int(label[5:])
708 expr = match.group("expr")
709 component = (match.group("component") or "")[1:] # skip the dot
710 attribute = match.group("attribute")
712 # split compound components into component kind and index
713 # eg: 'addr[0]' -> ('addr', '0')
714 component, component_index = self._netref_component_split(component)
716 # find object and resolve expression
717 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
718 if component not in self._NETREF_COMPONENT_GETTERS:
719 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
720 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
723 ref_value = self._NETREF_COMPONENT_GETTERS[component](
724 ref_testbed, ref_guid, component_index, attribute)
726 value = rv = value.replace(match.group(), ref_value)
729 # unresolvable netref
736 def do_netrefs(self, data, fail_if_undefined = False):
738 for (testbed_guid, guid), attrs in self._netrefs.items():
739 testbed = self._testbeds.get(testbed_guid)
740 if testbed is not None:
741 for name in set(attrs):
742 value = testbed.get(guid, name)
743 if isinstance(value, basestring):
744 ref_value = self.resolve_netref_value(value)
745 if ref_value is not None:
746 testbed.set(guid, name, ref_value)
748 elif fail_if_undefined:
749 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
751 del self._netrefs[(testbed_guid, guid)]
754 for testbed_guid, attrs in self._testbed_netrefs.items():
755 tb_data = dict(data.get_attribute_data(testbed_guid))
757 for name in set(attrs):
758 value = tb_data.get(name)
759 if isinstance(value, basestring):
760 ref_value = self.resolve_netref_value(value)
761 if ref_value is not None:
762 data.set_attribute_data(testbed_guid, name, ref_value)
764 elif fail_if_undefined:
765 raise ValueError, "Unresolvable netref in: %r" % (value,)
767 del self._testbed_netrefs[testbed_guid]
770 def _init_testbed_controllers(self, data, recover = False):
771 blacklist_testbeds = set(self._testbeds)
772 element_guids = list()
774 data_guids = data.guids
778 # gather label associations
779 for guid in data_guids:
780 if not data.is_testbed_data(guid):
781 (testbed_guid, factory_id) = data.get_box_data(guid)
782 label = data.get_attribute_data(guid, "label")
783 if label is not None:
784 if label in label_guids:
785 raise RuntimeError, "Label %r is not unique" % (label,)
786 label_guids[label] = guid
788 # create testbed controllers
789 for guid in data_guids:
790 if data.is_testbed_data(guid):
791 if guid not in self._testbeds:
793 self._create_testbed_controller(
794 guid, data, element_guids, recover)
797 blacklist_testbeds.add(guid)
802 policy = self._testbed_recovery_policy(guid, data=data)
803 if policy == DC.POLICY_RECOVER:
804 self._create_testbed_controller(
805 guid, data, element_guids, False)
807 elif policy == DC.POLICY_RESTART:
808 self._create_testbed_controller(
809 guid, data, element_guids, False)
813 self._failed_testbeds.add(guid)
817 # queue programmable elements
818 # - that have not been programmed already (blacklist_testbeds)
819 # - including recovered or restarted testbeds
820 # - but those that have no unresolved netrefs
821 for guid in data_guids:
822 if not data.is_testbed_data(guid):
823 (testbed_guid, factory_id) = data.get_box_data(guid)
824 if testbed_guid not in blacklist_testbeds:
825 element_guids.append(guid)
827 # replace references to elements labels for its guid
828 self._resolve_labels(data, data_guids, label_guids)
830 # program testbed controllers
832 self._program_testbed_controllers(element_guids, data)
834 return to_recover, to_restart
836 def _resolve_labels(self, data, data_guids, label_guids):
837 netrefs = self._netrefs
838 testbed_netrefs = self._testbed_netrefs
839 for guid in data_guids:
840 for name, value in data.get_attribute_data(guid):
841 if isinstance(value, basestring):
843 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
844 label = match.group("label")
845 if not label.startswith('GUID-'):
846 ref_guid = label_guids.get(label)
847 if ref_guid is not None:
848 value = ATTRIBUTE_PATTERN_BASE.sub(
849 ATTRIBUTE_PATTERN_GUID_SUB % dict(
850 guid = 'GUID-%d' % (ref_guid,),
851 expr = match.group("expr"),
854 data.set_attribute_data(guid, name, value)
856 # memorize which guid-attribute pairs require
857 # postprocessing, to avoid excessive controller-testbed
858 # communication at configuration time
859 # (which could require high-latency network I/O)
860 if not data.is_testbed_data(guid):
861 (testbed_guid, factory_id) = data.get_box_data(guid)
862 netrefs[(testbed_guid, guid)].add(name)
864 testbed_netrefs[guid].add(name)
870 def _create_testbed_controller(self, guid, data, element_guids, recover):
871 (testbed_id, testbed_version) = data.get_testbed_data(guid)
872 deployment_config = self._deployment_config.get(guid)
874 # deferred import because proxy needs
875 # our class definitions to define proxies
876 import nepi.util.proxy as proxy
878 if deployment_config is None:
880 deployment_config = proxy.AccessConfiguration()
882 for (name, value) in data.get_attribute_data(guid):
883 if value is not None and deployment_config.has_attribute(name):
884 # if any deployment config attribute has a netref, we can't
885 # create this controller yet
886 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
887 # remember to re-issue this one
888 self._netreffed_testbeds.add(guid)
891 # copy deployment config attribute
892 deployment_config.set_attribute_value(name, value)
895 self._deployment_config[guid] = deployment_config
897 if deployment_config is not None:
898 # force recovery mode
899 deployment_config.set_attribute_value("recover",recover)
901 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
903 for (name, value) in data.get_attribute_data(guid):
904 testbed.defer_configure(name, value)
905 self._testbeds[guid] = testbed
906 if guid in self._netreffed_testbeds:
907 self._netreffed_testbeds.remove(guid)
909 def _program_testbed_controllers(self, element_guids, data):
910 def resolve_create_netref(data, guid, name, value):
911 # Try to resolve create-time netrefs, if possible
912 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
914 nuvalue = self.resolve_netref_value(value)
916 # Any trouble means we're not in shape to resolve the netref yet
918 if nuvalue is not None:
919 # Only if we succeed we remove the netref deferral entry
921 data.set_attribute_data(guid, name, value)
922 if (testbed_guid, guid) in self._netrefs:
923 self._netrefs[(testbed_guid, guid)].discard(name)
926 for guid in element_guids:
927 (testbed_guid, factory_id) = data.get_box_data(guid)
928 testbed = self._testbeds.get(testbed_guid)
929 if testbed is not None:
931 testbed.defer_create(guid, factory_id)
933 for (name, value) in data.get_attribute_data(guid):
934 value = resolve_create_netref(data, guid, name, value)
935 testbed.defer_create_set(guid, name, value)
937 for guid in element_guids:
938 (testbed_guid, factory_id) = data.get_box_data(guid)
939 testbed = self._testbeds.get(testbed_guid)
940 if testbed is not None:
942 for trace_id in data.get_trace_data(guid):
943 testbed.defer_add_trace(guid, trace_id)
945 for (address, netprefix, broadcast) in data.get_address_data(guid):
947 testbed.defer_add_address(guid, address, netprefix,
950 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
951 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
952 # store connections data
953 for (connector_type_name, other_guid, other_connector_type_name) \
954 in data.get_connection_data(guid):
955 (other_testbed_guid, other_factory_id) = data.get_box_data(
957 if testbed_guid == other_testbed_guid:
958 # each testbed should take care of enforcing internal
959 # connection simmetry, so each connection is only
960 # added in one direction
961 testbed.defer_connect(guid, connector_type_name,
962 other_guid, other_connector_type_name)
964 def _program_testbed_cross_connections(self, data):
965 data_guids = data.guids
966 for guid in data_guids:
967 if not data.is_testbed_data(guid):
968 (testbed_guid, factory_id) = data.get_box_data(guid)
969 testbed = self._testbeds.get(testbed_guid)
970 if testbed is not None:
971 for (connector_type_name, cross_guid, cross_connector_type_name) \
972 in data.get_connection_data(guid):
973 (testbed_guid, factory_id) = data.get_box_data(guid)
974 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
976 if testbed_guid != cross_testbed_guid:
977 cross_testbed = self._testbeds[cross_testbed_guid]
978 cross_testbed_id = cross_testbed.testbed_id
979 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
980 cross_testbed_guid, cross_testbed_id, cross_factory_id,
981 cross_connector_type_name)
982 # save cross data for later
983 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
986 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
987 if testbed_guid not in self._cross_data:
988 self._cross_data[testbed_guid] = dict()
989 if cross_testbed_guid not in self._cross_data[testbed_guid]:
990 self._cross_data[testbed_guid][cross_testbed_guid] = set()
991 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
993 def _get_cross_data(self, testbed_guid):
995 if not testbed_guid in self._cross_data:
998 # fetch attribute lists in one batch
999 attribute_lists = dict()
1000 for cross_testbed_guid, guid_list in \
1001 self._cross_data[testbed_guid].iteritems():
1002 cross_testbed = self._testbeds[cross_testbed_guid]
1003 for cross_guid in guid_list:
1004 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1005 cross_testbed.get_attribute_list_deferred(cross_guid)
1007 # fetch attribute values in another batch
1008 for cross_testbed_guid, guid_list in \
1009 self._cross_data[testbed_guid].iteritems():
1010 cross_data[cross_testbed_guid] = dict()
1011 cross_testbed = self._testbeds[cross_testbed_guid]
1012 for cross_guid in guid_list:
1013 elem_cross_data = dict(
1015 _testbed_guid = cross_testbed_guid,
1016 _testbed_id = cross_testbed.testbed_id,
1017 _testbed_version = cross_testbed.testbed_version)
1018 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1019 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1020 for attr_name in attribute_list:
1021 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1022 elem_cross_data[attr_name] = attr_value
1024 # undefer all values - we'll have to serialize them probably later
1025 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1026 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1027 for attr_name, attr_value in elem_cross_data.iteritems():
1028 elem_cross_data[attr_name] = _undefer(attr_value)