2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
17 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
18 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
19 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
21 def _undefer(deferred):
22 if hasattr(deferred, '_get'):
23 return deferred._get()
28 class TestbedController(object):
29 def __init__(self, testbed_id, testbed_version):
30 self._testbed_id = testbed_id
31 self._testbed_version = testbed_version
35 return self._testbed_id
38 def testbed_version(self):
39 return self._testbed_version
43 raise NotImplementedError
45 def defer_configure(self, name, value):
46 """Instructs setting a configuartion attribute for the testbed instance"""
47 raise NotImplementedError
49 def defer_create(self, guid, factory_id):
50 """Instructs creation of element """
51 raise NotImplementedError
53 def defer_create_set(self, guid, name, value):
54 """Instructs setting an initial attribute on an element"""
55 raise NotImplementedError
57 def defer_factory_set(self, guid, name, value):
58 """Instructs setting an attribute on a factory"""
59 raise NotImplementedError
61 def defer_connect(self, guid1, connector_type_name1, guid2,
62 connector_type_name2):
63 """Instructs creation of a connection between the given connectors"""
64 raise NotImplementedError
66 def defer_cross_connect(self,
67 guid, connector_type_name,
68 cross_guid, cross_testbed_guid,
69 cross_testbed_id, cross_factory_id,
70 cross_connector_type_name):
72 Instructs creation of a connection between the given connectors
73 of different testbed instances
75 raise NotImplementedError
77 def defer_add_trace(self, guid, trace_id):
78 """Instructs the addition of a trace"""
79 raise NotImplementedError
81 def defer_add_address(self, guid, address, netprefix, broadcast):
82 """Instructs the addition of an address"""
83 raise NotImplementedError
85 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
86 """Instructs the addition of a route"""
87 raise NotImplementedError
90 """After do_setup the testbed initial configuration is done"""
91 raise NotImplementedError
95 After do_create all instructed elements are created and
98 raise NotImplementedError
100 def do_connect_init(self):
102 After do_connect_init all internal connections between testbed elements
105 raise NotImplementedError
107 def do_connect_compl(self):
109 After do_connect all internal connections between testbed elements
112 raise NotImplementedError
114 def do_preconfigure(self):
116 Done just before resolving netrefs, after connection, before cross connections,
117 useful for early stages of configuration, for setting up stuff that might be
118 required for netref resolution.
120 raise NotImplementedError
122 def do_configure(self):
123 """After do_configure elements are configured"""
124 raise NotImplementedError
126 def do_prestart(self):
127 """Before do_start elements are prestart-configured"""
128 raise NotImplementedError
130 def do_cross_connect_init(self, cross_data):
132 After do_cross_connect_init initiation of all external connections
133 between different testbed elements is performed
135 raise NotImplementedError
137 def do_cross_connect_compl(self, cross_data):
139 After do_cross_connect_compl completion of all external connections
140 between different testbed elements is performed
142 raise NotImplementedError
145 raise NotImplementedError
148 raise NotImplementedError
152 On testbed recovery (if recovery is a supported policy), the controller
153 instance will be re-created and the following sequence invoked:
156 defer_X - programming the testbed with persisted execution values
157 (not design values). Execution values (ExecImmutable attributes)
158 should be enough to recreate the testbed's state.
160 <cross-connection methods>
162 Start will not be called, and after cross connection invocations,
163 the testbed is supposed to be fully functional again.
165 raise NotImplementedError
167 def set(self, guid, name, value, time = TIME_NOW):
168 raise NotImplementedError
170 def get(self, guid, name, time = TIME_NOW):
171 raise NotImplementedError
173 def get_route(self, guid, index, attribute):
177 guid: guid of box to query
178 index: number of routing entry to fetch
179 attribute: one of Destination, NextHop, NetPrefix
181 raise NotImplementedError
183 def get_address(self, guid, index, attribute='Address'):
187 guid: guid of box to query
188 index: number of inteface to select
189 attribute: one of Address, NetPrefix, Broadcast
191 raise NotImplementedError
193 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
194 raise NotImplementedError
196 def get_factory_id(self, guid):
197 raise NotImplementedError
199 def action(self, time, guid, action):
200 raise NotImplementedError
202 def status(self, guid):
203 raise NotImplementedError
205 def testbed_status(self):
206 raise NotImplementedError
208 def trace(self, guid, trace_id, attribute='value'):
209 raise NotImplementedError
211 def traces_info(self):
212 """ dictionary of dictionaries:
218 filesize = size in bytes,
222 raise NotImplementedError
225 raise NotImplementedError
227 class ExperimentController(object):
228 def __init__(self, experiment_xml, root_dir):
229 self._experiment_design_xml = experiment_xml
230 self._experiment_execute_xml = None
231 self._testbeds = dict()
232 self._deployment_config = dict()
233 self._netrefs = collections.defaultdict(set)
234 self._testbed_netrefs = collections.defaultdict(set)
235 self._cross_data = dict()
236 self._root_dir = root_dir
237 self._netreffed_testbeds = set()
238 self._guids_in_testbed_cache = dict()
239 self._failed_testbeds = set()
240 self._started_time = None
241 self._stopped_time = None
243 if experiment_xml is None and root_dir is not None:
245 self.load_experiment_xml()
246 self.load_execute_xml()
248 self.persist_experiment_xml()
251 def experiment_design_xml(self):
252 return self._experiment_design_xml
255 def experiment_execute_xml(self):
256 return self._experiment_execute_xml
259 def started_time(self):
260 return self._started_time
263 def stopped_time(self):
264 return self._stopped_time
269 for testbed_guid in self._testbeds.keys():
270 _guids = self._guids_in_testbed(testbed_guid)
275 def persist_experiment_xml(self):
276 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
277 f = open(xml_path, "w")
278 f.write(self._experiment_design_xml)
281 def persist_execute_xml(self):
282 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
283 f = open(xml_path, "w")
284 f.write(self._experiment_execute_xml)
287 def load_experiment_xml(self):
288 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
289 f = open(xml_path, "r")
290 self._experiment_design_xml = f.read()
293 def load_execute_xml(self):
294 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
295 f = open(xml_path, "r")
296 self._experiment_execute_xml = f.read()
299 def trace(self, guid, trace_id, attribute='value'):
300 testbed = self._testbed_for_guid(guid)
302 return testbed.trace(guid, trace_id, attribute)
303 raise RuntimeError("No element exists with guid %d" % guid)
305 def traces_info(self):
307 for guid, testbed in self._testbeds.iteritems():
308 tinfo = testbed.traces_info()
310 traces_info[guid] = testbed.traces_info()
314 def _parallel(callables):
317 @functools.wraps(callable)
318 def wrapped(*p, **kw):
323 traceback.print_exc(file=sys.stderr)
324 excs.append(sys.exc_info())
326 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
327 for thread in threads:
329 for thread in threads:
332 eTyp, eVal, eLoc = exc
333 raise eTyp, eVal, eLoc
336 self._started_time = time.time()
339 def _start(self, recover = False):
340 parser = XmlExperimentParser()
343 xml = self._experiment_execute_xml
345 xml = self._experiment_design_xml
346 data = parser.from_xml_to_data(xml)
348 # instantiate testbed controllers
349 to_recover, to_restart = self._init_testbed_controllers(data, recover)
350 all_restart = set(to_restart)
353 # persist testbed connection data, for potential recovery
354 self._persist_testbed_proxies()
356 # recover recoverable controllers
357 for guid in to_recover:
359 self._testbeds[guid].do_setup()
360 self._testbeds[guid].recover()
363 self._failed_testbeds.add(guid)
365 def steps_to_configure(self, allowed_guids):
366 # perform setup in parallel for all test beds,
367 # wait for all threads to finish
368 self._parallel([testbed.do_setup
369 for guid,testbed in self._testbeds.iteritems()
370 if guid in allowed_guids])
372 # perform create-connect in parallel, wait
373 # (internal connections only)
374 self._parallel([testbed.do_create
375 for guid,testbed in self._testbeds.iteritems()
376 if guid in allowed_guids])
378 self._parallel([testbed.do_connect_init
379 for guid,testbed in self._testbeds.iteritems()
380 if guid in allowed_guids])
382 self._parallel([testbed.do_connect_compl
383 for guid,testbed in self._testbeds.iteritems()
384 if guid in allowed_guids])
386 self._parallel([testbed.do_preconfigure
387 for guid,testbed in self._testbeds.iteritems()
388 if guid in allowed_guids])
391 steps_to_configure(self, to_restart)
393 if self._netreffed_testbeds:
394 # initally resolve netrefs
395 self.do_netrefs(data, fail_if_undefined=False)
397 # rinse and repeat, for netreffed testbeds
398 netreffed_testbeds = set(self._netreffed_testbeds)
400 to_recover, to_restart = self._init_testbed_controllers(data, recover)
401 all_restart.update(to_restart)
404 # persist testbed connection data, for potential recovery
405 self._persist_testbed_proxies()
407 # recover recoverable controllers
408 for guid in to_recover:
410 self._testbeds[guid].do_setup()
411 self._testbeds[guid].recover()
414 self._failed_testbeds.add(guid)
416 # configure dependant testbeds
417 steps_to_configure(self, to_restart)
419 all_restart = [ self._testbeds[guid] for guid in all_restart ]
421 # final netref step, fail if anything's left unresolved
422 self.do_netrefs(data, fail_if_undefined=False)
424 # Only now, that netref dependencies have been solve, it is safe to
425 # program cross_connections
426 self._program_testbed_cross_connections(data)
428 # perform do_configure in parallel for al testbeds
429 # (it's internal configuration for each)
430 self._parallel([testbed.do_configure
431 for testbed in all_restart])
435 #print >>sys.stderr, "DO IT"
439 # cross-connect (cannot be done in parallel)
440 for guid, testbed in self._testbeds.iteritems():
441 cross_data = self._get_cross_data(guid)
442 testbed.do_cross_connect_init(cross_data)
443 for guid, testbed in self._testbeds.iteritems():
444 cross_data = self._get_cross_data(guid)
445 testbed.do_cross_connect_compl(cross_data)
449 # Last chance to configure (parallel on all testbeds)
450 self._parallel([testbed.do_prestart
451 for testbed in all_restart])
453 # final netref step, fail if anything's left unresolved
454 self.do_netrefs(data, fail_if_undefined=True)
459 # update execution xml with execution-specific values
460 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
461 self._update_execute_xml()
462 self.persist_execute_xml()
464 # start experiment (parallel start on all testbeds)
465 self._parallel([testbed.start
466 for testbed in all_restart])
470 def _clear_caches(self):
471 # Cleaning cache for safety.
472 self._guids_in_testbed_cache = dict()
474 def _persist_testbed_proxies(self):
475 TRANSIENT = (DC.RECOVER,)
477 # persist access configuration for all testbeds, so that
478 # recovery mode can reconnect to them if it becomes necessary
479 conf = ConfigParser.RawConfigParser()
480 for testbed_guid, testbed_config in self._deployment_config.iteritems():
481 testbed_guid = str(testbed_guid)
482 conf.add_section(testbed_guid)
483 for attr in testbed_config.get_attribute_list():
484 if attr not in TRANSIENT:
485 conf.set(testbed_guid, attr,
486 testbed_config.get_attribute_value(attr))
488 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
492 def _load_testbed_proxies(self):
494 Attribute.STRING : 'get',
495 Attribute.BOOL : 'getboolean',
496 Attribute.ENUM : 'get',
497 Attribute.DOUBLE : 'getfloat',
498 Attribute.INTEGER : 'getint',
501 TRANSIENT = (DC.RECOVER,)
503 # deferred import because proxy needs
504 # our class definitions to define proxies
505 import nepi.util.proxy as proxy
507 conf = ConfigParser.RawConfigParser()
508 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
509 for testbed_guid in conf.sections():
510 testbed_config = proxy.AccessConfiguration()
511 testbed_guid = str(testbed_guid)
512 for attr in testbed_config.get_attribute_list():
513 if attr not in TRANSIENT:
514 getter = getattr(conf, TYPEMAP.get(
515 testbed_config.get_attribute_type(attr),
517 testbed_config.set_attribute_value(
518 attr, getter(testbed_guid, attr))
520 def _unpersist_testbed_proxies(self):
522 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
524 # Just print exceptions, this is just cleanup
526 ######## BUG ##########
527 #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
528 #traceback.print_exc(file=sys.stderr)
530 def _update_execute_xml(self):
532 # For all elements in testbed,
533 # - gather immutable execute-readable attribuets lists
535 # Generate new design description from design xml
536 # (Wait for attributes lists - implicit syncpoint)
538 # For all elements in testbed,
539 # - gather all immutable execute-readable attribute
540 # values, asynchronously
541 # (Wait for attribute values - implicit syncpoint)
543 # For all elements in testbed,
544 # - inject non-None values into new design
545 # Generate execute xml from new design
547 attribute_lists = dict(
548 (testbed_guid, collections.defaultdict(dict))
549 for testbed_guid in self._testbeds
552 for testbed_guid, testbed in self._testbeds.iteritems():
553 guids = self._guids_in_testbed(testbed_guid)
555 attribute_lists[testbed_guid][guid] = \
556 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
558 parser = XmlExperimentParser()
559 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
561 attribute_values = dict(
562 (testbed_guid, collections.defaultdict(dict))
563 for testbed_guid in self._testbeds
566 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
567 testbed = self._testbeds[testbed_guid]
568 for guid, attribute_list in testbed_attribute_lists.iteritems():
569 attribute_list = _undefer(attribute_list)
570 attribute_values[testbed_guid][guid] = dict(
571 (attribute, testbed.get_deferred(guid, attribute))
572 for attribute in attribute_list
575 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
576 for guid, attribute_values in testbed_attribute_values.iteritems():
577 for attribute, value in attribute_values.iteritems():
578 value = _undefer(value)
579 if value is not None:
580 execute_data.add_attribute_data(guid, attribute, value)
582 self._experiment_execute_xml = parser.to_xml(data=execute_data)
585 for testbed in self._testbeds.values():
587 self._unpersist_testbed_proxies()
588 self._stopped_time = time.time()
591 # reload perviously persisted testbed access configurations
592 self._failed_testbeds.clear()
593 self._load_testbed_proxies()
595 # re-program testbeds that need recovery
596 self._start(recover = True)
598 def is_finished(self, guid):
599 testbed = self._testbed_for_guid(guid)
601 return testbed.status(guid) == AS.STATUS_FINISHED
602 raise RuntimeError("No element exists with guid %d" % guid)
604 def _testbed_recovery_policy(self, guid, data = None):
606 parser = XmlExperimentParser()
607 data = parser.from_xml_to_data(self._experiment_design_xml)
609 return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
611 def status(self, guid):
612 if guid in self._testbeds:
614 # report testbed status
615 if guid in self._failed_testbeds:
616 return TS.STATUS_FAILED
619 return self._testbeds[guid].status()
621 return TS.STATUS_UNRESPONSIVE
624 testbed = self._testbed_for_guid(guid)
625 if testbed is not None:
626 return testbed.status(guid)
628 return AS.STATUS_UNDETERMINED
630 def set(self, guid, name, value, time = TIME_NOW):
631 testbed = self._testbed_for_guid(guid)
633 testbed.set(guid, name, value, time)
635 raise RuntimeError("No element exists with guid %d" % guid)
637 def get(self, guid, name, time = TIME_NOW):
638 testbed = self._testbed_for_guid(guid)
640 return testbed.get(guid, name, time)
641 raise RuntimeError("No element exists with guid %d" % guid)
643 def get_deferred(self, guid, name, time = TIME_NOW):
644 testbed = self._testbed_for_guid(guid)
646 return testbed.get_deferred(guid, name, time)
647 raise RuntimeError("No element exists with guid %d" % guid)
649 def get_factory_id(self, guid):
650 testbed = self._testbed_for_guid(guid)
652 return testbed.get_factory_id(guid)
653 raise RuntimeError("No element exists with guid %d" % guid)
655 def get_testbed_id(self, guid):
656 testbed = self._testbed_for_guid(guid)
658 return testbed.testbed_id
659 raise RuntimeError("No element exists with guid %d" % guid)
661 def get_testbed_version(self, guid):
662 testbed = self._testbed_for_guid(guid)
664 return testbed.testbed_version
665 raise RuntimeError("No element exists with guid %d" % guid)
669 for testbed in self._testbeds.values():
673 exceptions.append(sys.exc_info())
674 for exc_info in exceptions:
675 raise exc_info[0], exc_info[1], exc_info[2]
677 def _testbed_for_guid(self, guid):
678 for testbed_guid in self._testbeds.keys():
679 if guid in self._guids_in_testbed(testbed_guid):
680 if testbed_guid in self._failed_testbeds:
682 return self._testbeds[testbed_guid]
685 def _guids_in_testbed(self, testbed_guid):
686 if testbed_guid not in self._testbeds:
688 if testbed_guid not in self._guids_in_testbed_cache:
689 self._guids_in_testbed_cache[testbed_guid] = \
690 set(self._testbeds[testbed_guid].guids)
691 return self._guids_in_testbed_cache[testbed_guid]
694 def _netref_component_split(component):
695 match = COMPONENT_PATTERN.match(component)
697 return match.group("kind"), match.group("index")
699 return component, None
701 _NETREF_COMPONENT_GETTERS = {
703 lambda testbed, guid, index, name:
704 testbed.get_address(guid, int(index), name),
706 lambda testbed, guid, index, name:
707 testbed.get_route(guid, int(index), name),
709 lambda testbed, guid, index, name:
710 testbed.trace(guid, index, name),
712 lambda testbed, guid, index, name:
713 testbed.get(guid, name),
716 def resolve_netref_value(self, value, failval = None):
719 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
720 label = match.group("label")
721 if label.startswith('GUID-'):
722 ref_guid = int(label[5:])
724 expr = match.group("expr")
725 component = (match.group("component") or "")[1:] # skip the dot
726 attribute = match.group("attribute")
728 # split compound components into component kind and index
729 # eg: 'addr[0]' -> ('addr', '0')
730 component, component_index = self._netref_component_split(component)
732 # find object and resolve expression
733 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
734 if component not in self._NETREF_COMPONENT_GETTERS:
735 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
736 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
739 ref_value = self._NETREF_COMPONENT_GETTERS[component](
740 ref_testbed, ref_guid, component_index, attribute)
742 value = rv = value.replace(match.group(), ref_value)
745 # unresolvable netref
752 def do_netrefs(self, data, fail_if_undefined = False):
754 for (testbed_guid, guid), attrs in self._netrefs.items():
755 testbed = self._testbeds.get(testbed_guid)
756 if testbed is not None:
757 for name in set(attrs):
758 value = testbed.get(guid, name)
759 if isinstance(value, basestring):
760 ref_value = self.resolve_netref_value(value)
761 if ref_value is not None:
762 testbed.set(guid, name, ref_value)
764 elif fail_if_undefined:
765 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
767 del self._netrefs[(testbed_guid, guid)]
770 for testbed_guid, attrs in self._testbed_netrefs.items():
771 tb_data = dict(data.get_attribute_data(testbed_guid))
773 for name in set(attrs):
774 value = tb_data.get(name)
775 if isinstance(value, basestring):
776 ref_value = self.resolve_netref_value(value)
777 if ref_value is not None:
778 data.set_attribute_data(testbed_guid, name, ref_value)
780 elif fail_if_undefined:
781 raise ValueError, "Unresolvable netref in: %r" % (value,)
783 del self._testbed_netrefs[testbed_guid]
786 def _init_testbed_controllers(self, data, recover = False):
787 blacklist_testbeds = set(self._testbeds)
788 element_guids = list()
790 data_guids = data.guids
794 # gather label associations
795 for guid in data_guids:
796 if not data.is_testbed_data(guid):
797 (testbed_guid, factory_id) = data.get_box_data(guid)
798 label = data.get_attribute_data(guid, "label")
799 if label is not None:
800 if label in label_guids:
801 raise RuntimeError, "Label %r is not unique" % (label,)
802 label_guids[label] = guid
804 # create testbed controllers
805 for guid in data_guids:
806 if data.is_testbed_data(guid):
807 if guid not in self._testbeds:
809 self._create_testbed_controller(
810 guid, data, element_guids, recover)
813 blacklist_testbeds.add(guid)
818 policy = self._testbed_recovery_policy(guid, data=data)
819 if policy == DC.POLICY_RECOVER:
820 self._create_testbed_controller(
821 guid, data, element_guids, False)
823 elif policy == DC.POLICY_RESTART:
824 self._create_testbed_controller(
825 guid, data, element_guids, False)
829 self._failed_testbeds.add(guid)
833 # queue programmable elements
834 # - that have not been programmed already (blacklist_testbeds)
835 # - including recovered or restarted testbeds
836 # - but those that have no unresolved netrefs
837 for guid in data_guids:
838 if not data.is_testbed_data(guid):
839 (testbed_guid, factory_id) = data.get_box_data(guid)
840 if testbed_guid not in blacklist_testbeds:
841 element_guids.append(guid)
843 # replace references to elements labels for its guid
844 self._resolve_labels(data, data_guids, label_guids)
846 # program testbed controllers
848 self._program_testbed_controllers(element_guids, data)
850 return to_recover, to_restart
852 def _resolve_labels(self, data, data_guids, label_guids):
853 netrefs = self._netrefs
854 testbed_netrefs = self._testbed_netrefs
855 for guid in data_guids:
856 for name, value in data.get_attribute_data(guid):
857 if isinstance(value, basestring):
859 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
860 label = match.group("label")
861 if not label.startswith('GUID-'):
862 ref_guid = label_guids.get(label)
863 if ref_guid is not None:
864 value = value.replace(
866 ATTRIBUTE_PATTERN_GUID_SUB % dict(
867 guid = 'GUID-%d' % (ref_guid,),
868 expr = match.group("expr"),
871 data.set_attribute_data(guid, name, value)
873 # memorize which guid-attribute pairs require
874 # postprocessing, to avoid excessive controller-testbed
875 # communication at configuration time
876 # (which could require high-latency network I/O)
877 if not data.is_testbed_data(guid):
878 (testbed_guid, factory_id) = data.get_box_data(guid)
879 netrefs[(testbed_guid, guid)].add(name)
881 testbed_netrefs[guid].add(name)
887 def _create_testbed_controller(self, guid, data, element_guids, recover):
888 (testbed_id, testbed_version) = data.get_testbed_data(guid)
889 deployment_config = self._deployment_config.get(guid)
891 # deferred import because proxy needs
892 # our class definitions to define proxies
893 import nepi.util.proxy as proxy
895 if deployment_config is None:
897 deployment_config = proxy.AccessConfiguration()
899 for (name, value) in data.get_attribute_data(guid):
900 if value is not None and deployment_config.has_attribute(name):
901 # if any deployment config attribute has a netref, we can't
902 # create this controller yet
903 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
904 # remember to re-issue this one
905 self._netreffed_testbeds.add(guid)
908 # copy deployment config attribute
909 deployment_config.set_attribute_value(name, value)
912 self._deployment_config[guid] = deployment_config
914 if deployment_config is not None:
915 # force recovery mode
916 deployment_config.set_attribute_value("recover",recover)
918 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
920 for (name, value) in data.get_attribute_data(guid):
921 testbed.defer_configure(name, value)
922 self._testbeds[guid] = testbed
923 if guid in self._netreffed_testbeds:
924 self._netreffed_testbeds.remove(guid)
926 def _program_testbed_controllers(self, element_guids, data):
927 def resolve_create_netref(data, guid, name, value):
928 # Try to resolve create-time netrefs, if possible
929 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
931 nuvalue = self.resolve_netref_value(value)
933 # Any trouble means we're not in shape to resolve the netref yet
935 if nuvalue is not None:
936 # Only if we succeed we remove the netref deferral entry
938 data.set_attribute_data(guid, name, value)
939 if (testbed_guid, guid) in self._netrefs:
940 self._netrefs[(testbed_guid, guid)].discard(name)
943 for guid in element_guids:
944 (testbed_guid, factory_id) = data.get_box_data(guid)
945 testbed = self._testbeds.get(testbed_guid)
946 if testbed is not None:
948 testbed.defer_create(guid, factory_id)
950 for (name, value) in data.get_attribute_data(guid):
951 value = resolve_create_netref(data, guid, name, value)
952 testbed.defer_create_set(guid, name, value)
954 for guid in element_guids:
955 (testbed_guid, factory_id) = data.get_box_data(guid)
956 testbed = self._testbeds.get(testbed_guid)
957 if testbed is not None:
959 for trace_id in data.get_trace_data(guid):
960 testbed.defer_add_trace(guid, trace_id)
962 for (address, netprefix, broadcast) in data.get_address_data(guid):
964 testbed.defer_add_address(guid, address, netprefix,
967 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
968 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
969 # store connections data
970 for (connector_type_name, other_guid, other_connector_type_name) \
971 in data.get_connection_data(guid):
972 (other_testbed_guid, other_factory_id) = data.get_box_data(
974 if testbed_guid == other_testbed_guid:
975 # each testbed should take care of enforcing internal
976 # connection simmetry, so each connection is only
977 # added in one direction
978 testbed.defer_connect(guid, connector_type_name,
979 other_guid, other_connector_type_name)
981 def _program_testbed_cross_connections(self, data):
982 data_guids = data.guids
983 for guid in data_guids:
984 if not data.is_testbed_data(guid):
985 (testbed_guid, factory_id) = data.get_box_data(guid)
986 testbed = self._testbeds.get(testbed_guid)
987 if testbed is not None:
988 for (connector_type_name, cross_guid, cross_connector_type_name) \
989 in data.get_connection_data(guid):
990 (testbed_guid, factory_id) = data.get_box_data(guid)
991 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
993 if testbed_guid != cross_testbed_guid:
994 cross_testbed = self._testbeds[cross_testbed_guid]
995 cross_testbed_id = cross_testbed.testbed_id
996 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
997 cross_testbed_guid, cross_testbed_id, cross_factory_id,
998 cross_connector_type_name)
999 # save cross data for later
1000 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1003 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1004 if testbed_guid not in self._cross_data:
1005 self._cross_data[testbed_guid] = dict()
1006 if cross_testbed_guid not in self._cross_data[testbed_guid]:
1007 self._cross_data[testbed_guid][cross_testbed_guid] = set()
1008 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1010 def _get_cross_data(self, testbed_guid):
1012 if not testbed_guid in self._cross_data:
1015 # fetch attribute lists in one batch
1016 attribute_lists = dict()
1017 for cross_testbed_guid, guid_list in \
1018 self._cross_data[testbed_guid].iteritems():
1019 cross_testbed = self._testbeds[cross_testbed_guid]
1020 for cross_guid in guid_list:
1021 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1022 cross_testbed.get_attribute_list_deferred(cross_guid)
1024 # fetch attribute values in another batch
1025 for cross_testbed_guid, guid_list in \
1026 self._cross_data[testbed_guid].iteritems():
1027 cross_data[cross_testbed_guid] = dict()
1028 cross_testbed = self._testbeds[cross_testbed_guid]
1029 for cross_guid in guid_list:
1030 elem_cross_data = dict(
1032 _testbed_guid = cross_testbed_guid,
1033 _testbed_id = cross_testbed.testbed_id,
1034 _testbed_version = cross_testbed.testbed_version)
1035 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1036 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1037 for attr_name in attribute_list:
1038 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1039 elem_cross_data[attr_name] = attr_value
1041 # undefer all values - we'll have to serialize them probably later
1042 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1043 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1044 for attr_name, attr_value in elem_cross_data.iteritems():
1045 elem_cross_data[attr_name] = _undefer(attr_value)