2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
17 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
18 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
19 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
21 def _undefer(deferred):
22 if hasattr(deferred, '_get'):
23 return deferred._get()
28 class TestbedController(object):
29 def __init__(self, testbed_id, testbed_version):
30 self._testbed_id = testbed_id
31 self._testbed_version = testbed_version
35 return self._testbed_id
38 def testbed_version(self):
39 return self._testbed_version
43 raise NotImplementedError
45 def defer_configure(self, name, value):
46 """Instructs setting a configuartion attribute for the testbed instance"""
47 raise NotImplementedError
49 def defer_create(self, guid, factory_id):
50 """Instructs creation of element """
51 raise NotImplementedError
53 def defer_create_set(self, guid, name, value):
54 """Instructs setting an initial attribute on an element"""
55 raise NotImplementedError
57 def defer_factory_set(self, guid, name, value):
58 """Instructs setting an attribute on a factory"""
59 raise NotImplementedError
61 def defer_connect(self, guid1, connector_type_name1, guid2,
62 connector_type_name2):
63 """Instructs creation of a connection between the given connectors"""
64 raise NotImplementedError
66 def defer_cross_connect(self,
67 guid, connector_type_name,
68 cross_guid, cross_testbed_guid,
69 cross_testbed_id, cross_factory_id,
70 cross_connector_type_name):
72 Instructs creation of a connection between the given connectors
73 of different testbed instances
75 raise NotImplementedError
77 def defer_add_trace(self, guid, trace_id):
78 """Instructs the addition of a trace"""
79 raise NotImplementedError
81 def defer_add_address(self, guid, address, netprefix, broadcast):
82 """Instructs the addition of an address"""
83 raise NotImplementedError
85 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
86 """Instructs the addition of a route"""
87 raise NotImplementedError
90 """After do_setup the testbed initial configuration is done"""
91 raise NotImplementedError
95 After do_create all instructed elements are created and
98 raise NotImplementedError
100 def do_connect_init(self):
102 After do_connect_init all internal connections between testbed elements
105 raise NotImplementedError
107 def do_connect_compl(self):
109 After do_connect all internal connections between testbed elements
112 raise NotImplementedError
114 def do_preconfigure(self):
116 Done just before resolving netrefs, after connection, before cross connections,
117 useful for early stages of configuration, for setting up stuff that might be
118 required for netref resolution.
120 raise NotImplementedError
122 def do_configure(self):
123 """After do_configure elements are configured"""
124 raise NotImplementedError
126 def do_prestart(self):
127 """Before do_start elements are prestart-configured"""
128 raise NotImplementedError
130 def do_cross_connect_init(self, cross_data):
132 After do_cross_connect_init initiation of all external connections
133 between different testbed elements is performed
135 raise NotImplementedError
137 def do_cross_connect_compl(self, cross_data):
139 After do_cross_connect_compl completion of all external connections
140 between different testbed elements is performed
142 raise NotImplementedError
145 raise NotImplementedError
148 raise NotImplementedError
152 On testbed recovery (if recovery is a supported policy), the controller
153 instance will be re-created and the following sequence invoked:
156 defer_X - programming the testbed with persisted execution values
157 (not design values). Execution values (ExecImmutable attributes)
158 should be enough to recreate the testbed's state.
160 <cross-connection methods>
162 Start will not be called, and after cross connection invocations,
163 the testbed is supposed to be fully functional again.
165 raise NotImplementedError
167 def set(self, guid, name, value, time = TIME_NOW):
168 raise NotImplementedError
170 def get(self, guid, name, time = TIME_NOW):
171 raise NotImplementedError
173 def get_route(self, guid, index, attribute):
177 guid: guid of box to query
178 index: number of routing entry to fetch
179 attribute: one of Destination, NextHop, NetPrefix
181 raise NotImplementedError
183 def get_address(self, guid, index, attribute='Address'):
187 guid: guid of box to query
188 index: number of inteface to select
189 attribute: one of Address, NetPrefix, Broadcast
191 raise NotImplementedError
193 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
194 raise NotImplementedError
196 def get_factory_id(self, guid):
197 raise NotImplementedError
199 def action(self, time, guid, action):
200 raise NotImplementedError
202 def status(self, guid):
203 raise NotImplementedError
205 def testbed_status(self):
206 raise NotImplementedError
208 def trace(self, guid, trace_id, attribute='value'):
209 raise NotImplementedError
211 def traces_info(self):
212 """ dictionary of dictionaries:
218 filesize = size in bytes,
222 raise NotImplementedError
225 raise NotImplementedError
227 class ExperimentController(object):
228 def __init__(self, experiment_xml, root_dir):
229 self._experiment_design_xml = experiment_xml
230 self._experiment_execute_xml = None
231 self._testbeds = dict()
232 self._deployment_config = dict()
233 self._netrefs = collections.defaultdict(set)
234 self._testbed_netrefs = collections.defaultdict(set)
235 self._cross_data = dict()
236 self._root_dir = root_dir
237 self._netreffed_testbeds = set()
238 self._guids_in_testbed_cache = dict()
239 self._failed_testbeds = set()
240 self._started_time = None
241 self._stopped_time = None
243 if experiment_xml is None and root_dir is not None:
245 self.load_experiment_xml()
246 self.load_execute_xml()
248 self.persist_experiment_xml()
251 def experiment_design_xml(self):
252 return self._experiment_design_xml
255 def experiment_execute_xml(self):
256 return self._experiment_execute_xml
259 def started_time(self):
260 return self._started_time
263 def stopped_time(self):
264 return self._stopped_time
269 for testbed_guid in self._testbeds.keys():
270 _guids = self._guids_in_testbed(testbed_guid)
275 def persist_experiment_xml(self):
276 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
277 f = open(xml_path, "w")
278 f.write(self._experiment_design_xml)
281 def persist_execute_xml(self):
282 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
283 f = open(xml_path, "w")
284 f.write(self._experiment_execute_xml)
287 def load_experiment_xml(self):
288 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
289 f = open(xml_path, "r")
290 self._experiment_design_xml = f.read()
293 def load_execute_xml(self):
294 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
295 f = open(xml_path, "r")
296 self._experiment_execute_xml = f.read()
299 def trace(self, guid, trace_id, attribute='value'):
300 testbed = self._testbed_for_guid(guid)
302 return testbed.trace(guid, trace_id, attribute)
303 raise RuntimeError("No element exists with guid %d" % guid)
305 def traces_info(self):
307 for guid, testbed in self._testbeds.iteritems():
308 tinfo = testbed.traces_info()
310 traces_info[guid] = testbed.traces_info()
314 def _parallel(callables):
317 @functools.wraps(callable)
318 def wrapped(*p, **kw):
323 traceback.print_exc(file=sys.stderr)
324 excs.append(sys.exc_info())
326 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
327 for thread in threads:
329 for thread in threads:
332 eTyp, eVal, eLoc = exc
333 raise eTyp, eVal, eLoc
336 self._started_time = time.time()
339 def _start(self, recover = False):
340 parser = XmlExperimentParser()
343 xml = self._experiment_execute_xml
345 xml = self._experiment_design_xml
346 data = parser.from_xml_to_data(xml)
348 # instantiate testbed controllers
349 to_recover, to_restart = self._init_testbed_controllers(data, recover)
350 all_restart = set(to_restart)
353 # persist testbed connection data, for potential recovery
354 self._persist_testbed_proxies()
356 # recover recoverable controllers
357 for guid in to_recover:
359 self._testbeds[guid].do_setup()
360 self._testbeds[guid].recover()
363 self._failed_testbeds.add(guid)
365 def steps_to_configure(self, allowed_guids):
366 # perform setup in parallel for all test beds,
367 # wait for all threads to finish
368 self._parallel([testbed.do_setup
369 for guid,testbed in self._testbeds.iteritems()
370 if guid in allowed_guids])
372 # perform create-connect in parallel, wait
373 # (internal connections only)
374 self._parallel([testbed.do_create
375 for guid,testbed in self._testbeds.iteritems()
376 if guid in allowed_guids])
378 self._parallel([testbed.do_connect_init
379 for guid,testbed in self._testbeds.iteritems()
380 if guid in allowed_guids])
382 self._parallel([testbed.do_connect_compl
383 for guid,testbed in self._testbeds.iteritems()
384 if guid in allowed_guids])
386 self._parallel([testbed.do_preconfigure
387 for guid,testbed in self._testbeds.iteritems()
388 if guid in allowed_guids])
391 steps_to_configure(self, to_restart)
393 if self._netreffed_testbeds:
394 # initally resolve netrefs
395 self.do_netrefs(data, fail_if_undefined=False)
397 # rinse and repeat, for netreffed testbeds
398 netreffed_testbeds = set(self._netreffed_testbeds)
400 to_recover, to_restart = self._init_testbed_controllers(data, recover)
401 all_restart.update(to_restart)
404 # persist testbed connection data, for potential recovery
405 self._persist_testbed_proxies()
407 # recover recoverable controllers
408 for guid in to_recover:
410 self._testbeds[guid].do_setup()
411 self._testbeds[guid].recover()
414 self._failed_testbeds.add(guid)
416 # configure dependant testbeds
417 steps_to_configure(self, to_restart)
419 all_restart = [ self._testbeds[guid] for guid in all_restart ]
421 # final netref step, fail if anything's left unresolved
422 self.do_netrefs(data, fail_if_undefined=True)
424 # Only now, that netref dependencies have been solve, it is safe to
425 # program cross_connections
426 self._program_testbed_cross_connections(data)
428 # perform do_configure in parallel for al testbeds
429 # (it's internal configuration for each)
430 self._parallel([testbed.do_configure
431 for testbed in all_restart])
435 #print >>sys.stderr, "DO IT"
439 # cross-connect (cannot be done in parallel)
440 for guid, testbed in self._testbeds.iteritems():
441 cross_data = self._get_cross_data(guid)
442 testbed.do_cross_connect_init(cross_data)
443 for guid, testbed in self._testbeds.iteritems():
444 cross_data = self._get_cross_data(guid)
445 testbed.do_cross_connect_compl(cross_data)
449 # Last chance to configure (parallel on all testbeds)
450 self._parallel([testbed.do_prestart
451 for testbed in all_restart])
456 # update execution xml with execution-specific values
457 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
458 self._update_execute_xml()
459 self.persist_execute_xml()
461 # start experiment (parallel start on all testbeds)
462 self._parallel([testbed.start
463 for testbed in all_restart])
467 def _clear_caches(self):
468 # Cleaning cache for safety.
469 self._guids_in_testbed_cache = dict()
471 def _persist_testbed_proxies(self):
472 TRANSIENT = (DC.RECOVER,)
474 # persist access configuration for all testbeds, so that
475 # recovery mode can reconnect to them if it becomes necessary
476 conf = ConfigParser.RawConfigParser()
477 for testbed_guid, testbed_config in self._deployment_config.iteritems():
478 testbed_guid = str(testbed_guid)
479 conf.add_section(testbed_guid)
480 for attr in testbed_config.get_attribute_list():
481 if attr not in TRANSIENT:
482 conf.set(testbed_guid, attr,
483 testbed_config.get_attribute_value(attr))
485 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
489 def _load_testbed_proxies(self):
491 Attribute.STRING : 'get',
492 Attribute.BOOL : 'getboolean',
493 Attribute.ENUM : 'get',
494 Attribute.DOUBLE : 'getfloat',
495 Attribute.INTEGER : 'getint',
498 TRANSIENT = (DC.RECOVER,)
500 # deferred import because proxy needs
501 # our class definitions to define proxies
502 import nepi.util.proxy as proxy
504 conf = ConfigParser.RawConfigParser()
505 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
506 for testbed_guid in conf.sections():
507 testbed_config = proxy.AccessConfiguration()
508 testbed_guid = str(testbed_guid)
509 for attr in testbed_config.get_attribute_list():
510 if attr not in TRANSIENT:
511 getter = getattr(conf, TYPEMAP.get(
512 testbed_config.get_attribute_type(attr),
514 testbed_config.set_attribute_value(
515 attr, getter(testbed_guid, attr))
517 def _unpersist_testbed_proxies(self):
519 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
521 # Just print exceptions, this is just cleanup
523 ######## BUG ##########
524 #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
525 #traceback.print_exc(file=sys.stderr)
527 def _update_execute_xml(self):
529 # For all elements in testbed,
530 # - gather immutable execute-readable attribuets lists
532 # Generate new design description from design xml
533 # (Wait for attributes lists - implicit syncpoint)
535 # For all elements in testbed,
536 # - gather all immutable execute-readable attribute
537 # values, asynchronously
538 # (Wait for attribute values - implicit syncpoint)
540 # For all elements in testbed,
541 # - inject non-None values into new design
542 # Generate execute xml from new design
544 attribute_lists = dict(
545 (testbed_guid, collections.defaultdict(dict))
546 for testbed_guid in self._testbeds
549 for testbed_guid, testbed in self._testbeds.iteritems():
550 guids = self._guids_in_testbed(testbed_guid)
552 attribute_lists[testbed_guid][guid] = \
553 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
555 parser = XmlExperimentParser()
556 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
558 attribute_values = dict(
559 (testbed_guid, collections.defaultdict(dict))
560 for testbed_guid in self._testbeds
563 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
564 testbed = self._testbeds[testbed_guid]
565 for guid, attribute_list in testbed_attribute_lists.iteritems():
566 attribute_list = _undefer(attribute_list)
567 attribute_values[testbed_guid][guid] = dict(
568 (attribute, testbed.get_deferred(guid, attribute))
569 for attribute in attribute_list
572 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
573 for guid, attribute_values in testbed_attribute_values.iteritems():
574 for attribute, value in attribute_values.iteritems():
575 value = _undefer(value)
576 if value is not None:
577 execute_data.add_attribute_data(guid, attribute, value)
579 self._experiment_execute_xml = parser.to_xml(data=execute_data)
582 for testbed in self._testbeds.values():
584 self._unpersist_testbed_proxies()
585 self._stopped_time = time.time()
588 # reload perviously persisted testbed access configurations
589 self._failed_testbeds.clear()
590 self._load_testbed_proxies()
592 # re-program testbeds that need recovery
593 self._start(recover = True)
595 def is_finished(self, guid):
596 testbed = self._testbed_for_guid(guid)
598 return testbed.status(guid) == AS.STATUS_FINISHED
599 raise RuntimeError("No element exists with guid %d" % guid)
601 def _testbed_recovery_policy(self, guid, data = None):
603 parser = XmlExperimentParser()
604 data = parser.from_xml_to_data(self._experiment_design_xml)
606 return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
608 def status(self, guid):
609 if guid in self._testbeds:
611 # report testbed status
612 if guid in self._failed_testbeds:
613 return TS.STATUS_FAILED
616 return self._testbeds[guid].status()
618 return TS.STATUS_UNRESPONSIVE
621 testbed = self._testbed_for_guid(guid)
622 if testbed is not None:
623 return testbed.status(guid)
625 return AS.STATUS_UNDETERMINED
627 def set(self, guid, name, value, time = TIME_NOW):
628 testbed = self._testbed_for_guid(guid)
630 testbed.set(guid, name, value, time)
632 raise RuntimeError("No element exists with guid %d" % guid)
634 def get(self, guid, name, time = TIME_NOW):
635 testbed = self._testbed_for_guid(guid)
637 return testbed.get(guid, name, time)
638 raise RuntimeError("No element exists with guid %d" % guid)
640 def get_deferred(self, guid, name, time = TIME_NOW):
641 testbed = self._testbed_for_guid(guid)
643 return testbed.get_deferred(guid, name, time)
644 raise RuntimeError("No element exists with guid %d" % guid)
646 def get_factory_id(self, guid):
647 testbed = self._testbed_for_guid(guid)
649 return testbed.get_factory_id(guid)
650 raise RuntimeError("No element exists with guid %d" % guid)
652 def get_testbed_id(self, guid):
653 testbed = self._testbed_for_guid(guid)
655 return testbed.testbed_id
656 raise RuntimeError("No element exists with guid %d" % guid)
658 def get_testbed_version(self, guid):
659 testbed = self._testbed_for_guid(guid)
661 return testbed.testbed_version
662 raise RuntimeError("No element exists with guid %d" % guid)
666 for testbed in self._testbeds.values():
670 exceptions.append(sys.exc_info())
671 for exc_info in exceptions:
672 raise exc_info[0], exc_info[1], exc_info[2]
674 def _testbed_for_guid(self, guid):
675 for testbed_guid in self._testbeds.keys():
676 if guid in self._guids_in_testbed(testbed_guid):
677 if testbed_guid in self._failed_testbeds:
679 return self._testbeds[testbed_guid]
682 def _guids_in_testbed(self, testbed_guid):
683 if testbed_guid not in self._testbeds:
685 if testbed_guid not in self._guids_in_testbed_cache:
686 self._guids_in_testbed_cache[testbed_guid] = \
687 set(self._testbeds[testbed_guid].guids)
688 return self._guids_in_testbed_cache[testbed_guid]
691 def _netref_component_split(component):
692 match = COMPONENT_PATTERN.match(component)
694 return match.group("kind"), match.group("index")
696 return component, None
698 _NETREF_COMPONENT_GETTERS = {
700 lambda testbed, guid, index, name:
701 testbed.get_address(guid, int(index), name),
703 lambda testbed, guid, index, name:
704 testbed.get_route(guid, int(index), name),
706 lambda testbed, guid, index, name:
707 testbed.trace(guid, index, name),
709 lambda testbed, guid, index, name:
710 testbed.get(guid, name),
713 def resolve_netref_value(self, value, failval = None):
716 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
717 label = match.group("label")
718 if label.startswith('GUID-'):
719 ref_guid = int(label[5:])
721 expr = match.group("expr")
722 component = (match.group("component") or "")[1:] # skip the dot
723 attribute = match.group("attribute")
725 # split compound components into component kind and index
726 # eg: 'addr[0]' -> ('addr', '0')
727 component, component_index = self._netref_component_split(component)
729 # find object and resolve expression
730 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
731 if component not in self._NETREF_COMPONENT_GETTERS:
732 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
733 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
736 ref_value = self._NETREF_COMPONENT_GETTERS[component](
737 ref_testbed, ref_guid, component_index, attribute)
739 value = rv = value.replace(match.group(), ref_value)
742 # unresolvable netref
749 def do_netrefs(self, data, fail_if_undefined = False):
751 for (testbed_guid, guid), attrs in self._netrefs.items():
752 testbed = self._testbeds.get(testbed_guid)
753 if testbed is not None:
754 for name in set(attrs):
755 value = testbed.get(guid, name)
756 if isinstance(value, basestring):
757 ref_value = self.resolve_netref_value(value)
758 if ref_value is not None:
759 testbed.set(guid, name, ref_value)
761 elif fail_if_undefined:
762 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
764 del self._netrefs[(testbed_guid, guid)]
767 for testbed_guid, attrs in self._testbed_netrefs.items():
768 tb_data = dict(data.get_attribute_data(testbed_guid))
770 for name in set(attrs):
771 value = tb_data.get(name)
772 if isinstance(value, basestring):
773 ref_value = self.resolve_netref_value(value)
774 if ref_value is not None:
775 data.set_attribute_data(testbed_guid, name, ref_value)
777 elif fail_if_undefined:
778 raise ValueError, "Unresolvable netref in: %r" % (value,)
780 del self._testbed_netrefs[testbed_guid]
783 def _init_testbed_controllers(self, data, recover = False):
784 blacklist_testbeds = set(self._testbeds)
785 element_guids = list()
787 data_guids = data.guids
791 # gather label associations
792 for guid in data_guids:
793 if not data.is_testbed_data(guid):
794 (testbed_guid, factory_id) = data.get_box_data(guid)
795 label = data.get_attribute_data(guid, "label")
796 if label is not None:
797 if label in label_guids:
798 raise RuntimeError, "Label %r is not unique" % (label,)
799 label_guids[label] = guid
801 # create testbed controllers
802 for guid in data_guids:
803 if data.is_testbed_data(guid):
804 if guid not in self._testbeds:
806 self._create_testbed_controller(
807 guid, data, element_guids, recover)
810 blacklist_testbeds.add(guid)
815 policy = self._testbed_recovery_policy(guid, data=data)
816 if policy == DC.POLICY_RECOVER:
817 self._create_testbed_controller(
818 guid, data, element_guids, False)
820 elif policy == DC.POLICY_RESTART:
821 self._create_testbed_controller(
822 guid, data, element_guids, False)
826 self._failed_testbeds.add(guid)
830 # queue programmable elements
831 # - that have not been programmed already (blacklist_testbeds)
832 # - including recovered or restarted testbeds
833 # - but those that have no unresolved netrefs
834 for guid in data_guids:
835 if not data.is_testbed_data(guid):
836 (testbed_guid, factory_id) = data.get_box_data(guid)
837 if testbed_guid not in blacklist_testbeds:
838 element_guids.append(guid)
840 # replace references to elements labels for its guid
841 self._resolve_labels(data, data_guids, label_guids)
843 # program testbed controllers
845 self._program_testbed_controllers(element_guids, data)
847 return to_recover, to_restart
849 def _resolve_labels(self, data, data_guids, label_guids):
850 netrefs = self._netrefs
851 testbed_netrefs = self._testbed_netrefs
852 for guid in data_guids:
853 for name, value in data.get_attribute_data(guid):
854 if isinstance(value, basestring):
856 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
857 label = match.group("label")
858 if not label.startswith('GUID-'):
859 ref_guid = label_guids.get(label)
860 if ref_guid is not None:
861 value = ATTRIBUTE_PATTERN_BASE.sub(
862 ATTRIBUTE_PATTERN_GUID_SUB % dict(
863 guid = 'GUID-%d' % (ref_guid,),
864 expr = match.group("expr"),
867 data.set_attribute_data(guid, name, value)
869 # memorize which guid-attribute pairs require
870 # postprocessing, to avoid excessive controller-testbed
871 # communication at configuration time
872 # (which could require high-latency network I/O)
873 if not data.is_testbed_data(guid):
874 (testbed_guid, factory_id) = data.get_box_data(guid)
875 netrefs[(testbed_guid, guid)].add(name)
877 testbed_netrefs[guid].add(name)
883 def _create_testbed_controller(self, guid, data, element_guids, recover):
884 (testbed_id, testbed_version) = data.get_testbed_data(guid)
885 deployment_config = self._deployment_config.get(guid)
887 # deferred import because proxy needs
888 # our class definitions to define proxies
889 import nepi.util.proxy as proxy
891 if deployment_config is None:
893 deployment_config = proxy.AccessConfiguration()
895 for (name, value) in data.get_attribute_data(guid):
896 if value is not None and deployment_config.has_attribute(name):
897 # if any deployment config attribute has a netref, we can't
898 # create this controller yet
899 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
900 # remember to re-issue this one
901 self._netreffed_testbeds.add(guid)
904 # copy deployment config attribute
905 deployment_config.set_attribute_value(name, value)
908 self._deployment_config[guid] = deployment_config
910 if deployment_config is not None:
911 # force recovery mode
912 deployment_config.set_attribute_value("recover",recover)
914 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
916 for (name, value) in data.get_attribute_data(guid):
917 testbed.defer_configure(name, value)
918 self._testbeds[guid] = testbed
919 if guid in self._netreffed_testbeds:
920 self._netreffed_testbeds.remove(guid)
922 def _program_testbed_controllers(self, element_guids, data):
923 def resolve_create_netref(data, guid, name, value):
924 # Try to resolve create-time netrefs, if possible
925 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
927 nuvalue = self.resolve_netref_value(value)
929 # Any trouble means we're not in shape to resolve the netref yet
931 if nuvalue is not None:
932 # Only if we succeed we remove the netref deferral entry
934 data.set_attribute_data(guid, name, value)
935 if (testbed_guid, guid) in self._netrefs:
936 self._netrefs[(testbed_guid, guid)].discard(name)
939 for guid in element_guids:
940 (testbed_guid, factory_id) = data.get_box_data(guid)
941 testbed = self._testbeds.get(testbed_guid)
942 if testbed is not None:
944 testbed.defer_create(guid, factory_id)
946 for (name, value) in data.get_attribute_data(guid):
947 value = resolve_create_netref(data, guid, name, value)
948 testbed.defer_create_set(guid, name, value)
950 for guid in element_guids:
951 (testbed_guid, factory_id) = data.get_box_data(guid)
952 testbed = self._testbeds.get(testbed_guid)
953 if testbed is not None:
955 for trace_id in data.get_trace_data(guid):
956 testbed.defer_add_trace(guid, trace_id)
958 for (address, netprefix, broadcast) in data.get_address_data(guid):
960 testbed.defer_add_address(guid, address, netprefix,
963 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
964 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
965 # store connections data
966 for (connector_type_name, other_guid, other_connector_type_name) \
967 in data.get_connection_data(guid):
968 (other_testbed_guid, other_factory_id) = data.get_box_data(
970 if testbed_guid == other_testbed_guid:
971 # each testbed should take care of enforcing internal
972 # connection simmetry, so each connection is only
973 # added in one direction
974 testbed.defer_connect(guid, connector_type_name,
975 other_guid, other_connector_type_name)
977 def _program_testbed_cross_connections(self, data):
978 data_guids = data.guids
979 for guid in data_guids:
980 if not data.is_testbed_data(guid):
981 (testbed_guid, factory_id) = data.get_box_data(guid)
982 testbed = self._testbeds.get(testbed_guid)
983 if testbed is not None:
984 for (connector_type_name, cross_guid, cross_connector_type_name) \
985 in data.get_connection_data(guid):
986 (testbed_guid, factory_id) = data.get_box_data(guid)
987 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
989 if testbed_guid != cross_testbed_guid:
990 cross_testbed = self._testbeds[cross_testbed_guid]
991 cross_testbed_id = cross_testbed.testbed_id
992 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
993 cross_testbed_guid, cross_testbed_id, cross_factory_id,
994 cross_connector_type_name)
995 # save cross data for later
996 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
999 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1000 if testbed_guid not in self._cross_data:
1001 self._cross_data[testbed_guid] = dict()
1002 if cross_testbed_guid not in self._cross_data[testbed_guid]:
1003 self._cross_data[testbed_guid][cross_testbed_guid] = set()
1004 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1006 def _get_cross_data(self, testbed_guid):
1008 if not testbed_guid in self._cross_data:
1011 # fetch attribute lists in one batch
1012 attribute_lists = dict()
1013 for cross_testbed_guid, guid_list in \
1014 self._cross_data[testbed_guid].iteritems():
1015 cross_testbed = self._testbeds[cross_testbed_guid]
1016 for cross_guid in guid_list:
1017 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1018 cross_testbed.get_attribute_list_deferred(cross_guid)
1020 # fetch attribute values in another batch
1021 for cross_testbed_guid, guid_list in \
1022 self._cross_data[testbed_guid].iteritems():
1023 cross_data[cross_testbed_guid] = dict()
1024 cross_testbed = self._testbeds[cross_testbed_guid]
1025 for cross_guid in guid_list:
1026 elem_cross_data = dict(
1028 _testbed_guid = cross_testbed_guid,
1029 _testbed_id = cross_testbed.testbed_id,
1030 _testbed_version = cross_testbed.testbed_version)
1031 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1032 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1033 for attr_name in attribute_list:
1034 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1035 elem_cross_data[attr_name] = attr_value
1037 # undefer all values - we'll have to serialize them probably later
1038 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1039 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1040 for attr_name, attr_value in elem_cross_data.iteritems():
1041 elem_cross_data[attr_name] = _undefer(attr_value)