2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
19 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
20 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
21 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
23 def _undefer(deferred):
24 if hasattr(deferred, '_get'):
25 return deferred._get()
30 class TestbedController(object):
31 def __init__(self, testbed_id, testbed_version):
32 self._testbed_id = testbed_id
33 self._testbed_version = testbed_version
37 return self._testbed_id
40 def testbed_version(self):
41 return self._testbed_version
45 raise NotImplementedError
47 def defer_configure(self, name, value):
48 """Instructs setting a configuartion attribute for the testbed instance"""
49 raise NotImplementedError
51 def defer_create(self, guid, factory_id):
52 """Instructs creation of element """
53 raise NotImplementedError
55 def defer_create_set(self, guid, name, value):
56 """Instructs setting an initial attribute on an element"""
57 raise NotImplementedError
59 def defer_factory_set(self, guid, name, value):
60 """Instructs setting an attribute on a factory"""
61 raise NotImplementedError
63 def defer_connect(self, guid1, connector_type_name1, guid2,
64 connector_type_name2):
65 """Instructs creation of a connection between the given connectors"""
66 raise NotImplementedError
68 def defer_cross_connect(self,
69 guid, connector_type_name,
70 cross_guid, cross_testbed_guid,
71 cross_testbed_id, cross_factory_id,
72 cross_connector_type_name):
74 Instructs creation of a connection between the given connectors
75 of different testbed instances
77 raise NotImplementedError
79 def defer_add_trace(self, guid, trace_id):
80 """Instructs the addition of a trace"""
81 raise NotImplementedError
83 def defer_add_address(self, guid, address, netprefix, broadcast):
84 """Instructs the addition of an address"""
85 raise NotImplementedError
87 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
88 """Instructs the addition of a route"""
89 raise NotImplementedError
92 """After do_setup the testbed initial configuration is done"""
93 raise NotImplementedError
97 After do_create all instructed elements are created and
100 raise NotImplementedError
102 def do_connect_init(self):
104 After do_connect_init all internal connections between testbed elements
107 raise NotImplementedError
109 def do_connect_compl(self):
111 After do_connect all internal connections between testbed elements
114 raise NotImplementedError
116 def do_preconfigure(self):
118 Done just before resolving netrefs, after connection, before cross connections,
119 useful for early stages of configuration, for setting up stuff that might be
120 required for netref resolution.
122 raise NotImplementedError
124 def do_configure(self):
125 """After do_configure elements are configured"""
126 raise NotImplementedError
128 def do_prestart(self):
129 """Before do_start elements are prestart-configured"""
130 raise NotImplementedError
132 def do_cross_connect_init(self, cross_data):
134 After do_cross_connect_init initiation of all external connections
135 between different testbed elements is performed
137 raise NotImplementedError
139 def do_cross_connect_compl(self, cross_data):
141 After do_cross_connect_compl completion of all external connections
142 between different testbed elements is performed
144 raise NotImplementedError
147 raise NotImplementedError
150 raise NotImplementedError
154 On testbed recovery (if recovery is a supported policy), the controller
155 instance will be re-created and the following sequence invoked:
158 defer_X - programming the testbed with persisted execution values
159 (not design values). Execution values (ExecImmutable attributes)
160 should be enough to recreate the testbed's state.
162 <cross-connection methods>
164 Start will not be called, and after cross connection invocations,
165 the testbed is supposed to be fully functional again.
167 raise NotImplementedError
169 def set(self, guid, name, value, time = TIME_NOW):
170 raise NotImplementedError
172 def get(self, guid, name, time = TIME_NOW):
173 raise NotImplementedError
175 def get_route(self, guid, index, attribute):
179 guid: guid of box to query
180 index: number of routing entry to fetch
181 attribute: one of Destination, NextHop, NetPrefix
183 raise NotImplementedError
185 def get_address(self, guid, index, attribute='Address'):
189 guid: guid of box to query
190 index: number of inteface to select
191 attribute: one of Address, NetPrefix, Broadcast
193 raise NotImplementedError
195 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
196 raise NotImplementedError
198 def get_factory_id(self, guid):
199 raise NotImplementedError
201 def action(self, time, guid, action):
202 raise NotImplementedError
204 def status(self, guid):
205 raise NotImplementedError
207 def testbed_status(self):
208 raise NotImplementedError
210 def trace(self, guid, trace_id, attribute='value'):
211 raise NotImplementedError
213 def traces_info(self):
214 """ dictionary of dictionaries:
220 filesize = size in bytes,
224 raise NotImplementedError
227 raise NotImplementedError
229 class ExperimentController(object):
230 def __init__(self, experiment_xml, root_dir):
231 self._experiment_design_xml = experiment_xml
232 self._experiment_execute_xml = None
233 self._testbeds = dict()
234 self._deployment_config = dict()
235 self._netrefs = collections.defaultdict(set)
236 self._testbed_netrefs = collections.defaultdict(set)
237 self._cross_data = dict()
238 self._root_dir = root_dir
239 self._netreffed_testbeds = set()
240 self._guids_in_testbed_cache = dict()
241 self._failed_testbeds = set()
242 self._started_time = None
243 self._stopped_time = None
244 self._testbed_order = []
246 self._logger = logging.getLogger('nepi.core.execute')
247 level = logging.ERROR
248 if os.environ.get("NEPI_CONTROLLER_LOGLEVEL",
249 DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
250 level = logging.DEBUG
251 self._logger.setLevel(level)
253 if experiment_xml is None and root_dir is not None:
255 self.load_experiment_xml()
256 self.load_execute_xml()
258 self.persist_experiment_xml()
261 def experiment_design_xml(self):
262 return self._experiment_design_xml
265 def experiment_execute_xml(self):
266 return self._experiment_execute_xml
269 def started_time(self):
270 return self._started_time
273 def stopped_time(self):
274 return self._stopped_time
279 for testbed_guid in self._testbeds.keys():
280 _guids = self._guids_in_testbed(testbed_guid)
285 def persist_experiment_xml(self):
286 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
287 f = open(xml_path, "w")
288 f.write(self._experiment_design_xml)
291 def persist_execute_xml(self):
292 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
293 f = open(xml_path, "w")
294 f.write(self._experiment_execute_xml)
297 def load_experiment_xml(self):
298 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
299 f = open(xml_path, "r")
300 self._experiment_design_xml = f.read()
303 def load_execute_xml(self):
304 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
305 f = open(xml_path, "r")
306 self._experiment_execute_xml = f.read()
309 def trace(self, guid, trace_id, attribute='value'):
310 testbed = self._testbed_for_guid(guid)
312 return testbed.trace(guid, trace_id, attribute)
313 raise RuntimeError("No element exists with guid %d" % guid)
315 def traces_info(self):
317 for guid, testbed in self._testbeds.iteritems():
318 tinfo = testbed.traces_info()
320 traces_info[guid] = testbed.traces_info()
324 def _parallel(callables):
327 def wrapped(*p, **kw):
331 logging.exception("Exception occurred in asynchronous thread:")
332 excs.append(sys.exc_info())
334 wrapped = functools.wraps(callable)(wrapped)
336 # functools.partial not wrappable
339 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
340 for thread in threads:
342 for thread in threads:
345 eTyp, eVal, eLoc = exc
346 raise eTyp, eVal, eLoc
349 self._started_time = time.time()
352 def _start(self, recover = False):
353 parser = XmlExperimentParser()
356 xml = self._experiment_execute_xml
358 xml = self._experiment_design_xml
359 data = parser.from_xml_to_data(xml)
361 # instantiate testbed controllers
362 to_recover, to_restart = self._init_testbed_controllers(data, recover)
363 all_restart = set(to_restart)
366 # persist testbed connection data, for potential recovery
367 self._persist_testbed_proxies()
369 # recover recoverable controllers
370 for guid in to_recover:
372 self._testbeds[guid].do_setup()
373 self._testbeds[guid].recover()
375 self._logger.exception("During recovery of testbed %s", guid)
378 self._failed_testbeds.add(guid)
380 def steps_to_configure(self, allowed_guids):
381 # perform setup in parallel for all test beds,
382 # wait for all threads to finish
384 self._logger.debug("ExperimentController: Starting parallel do_setup")
385 self._parallel([testbed.do_setup
386 for guid,testbed in self._testbeds.iteritems()
387 if guid in allowed_guids])
389 # perform create-connect in parallel, wait
390 # (internal connections only)
391 self._logger.debug("ExperimentController: Starting parallel do_create")
392 self._parallel([testbed.do_create
393 for guid,testbed in self._testbeds.iteritems()
394 if guid in allowed_guids])
396 self._logger.debug("ExperimentController: Starting parallel do_connect_init")
397 self._parallel([testbed.do_connect_init
398 for guid,testbed in self._testbeds.iteritems()
399 if guid in allowed_guids])
401 self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
402 self._parallel([testbed.do_connect_compl
403 for guid,testbed in self._testbeds.iteritems()
404 if guid in allowed_guids])
406 self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
407 self._parallel([testbed.do_preconfigure
408 for guid,testbed in self._testbeds.iteritems()
409 if guid in allowed_guids])
412 # Store testbed order
413 self._testbed_order.append(allowed_guids)
415 steps_to_configure(self, to_restart)
417 if self._netreffed_testbeds:
418 self._logger.debug("ExperimentController: Resolving netreffed testbeds")
419 # initally resolve netrefs
420 self.do_netrefs(data, fail_if_undefined=False)
422 # rinse and repeat, for netreffed testbeds
423 netreffed_testbeds = set(self._netreffed_testbeds)
425 to_recover, to_restart = self._init_testbed_controllers(data, recover)
426 all_restart.update(to_restart)
429 # persist testbed connection data, for potential recovery
430 self._persist_testbed_proxies()
432 # recover recoverable controllers
433 for guid in to_recover:
435 self._testbeds[guid].do_setup()
436 self._testbeds[guid].recover()
438 self._logger.exception("During recovery of testbed %s", guid)
441 self._failed_testbeds.add(guid)
443 # configure dependant testbeds
444 steps_to_configure(self, to_restart)
446 all_restart = [ self._testbeds[guid] for guid in all_restart ]
448 # final netref step, fail if anything's left unresolved
449 self._logger.debug("ExperimentController: Resolving do_netrefs")
450 self.do_netrefs(data, fail_if_undefined=False)
452 # Only now, that netref dependencies have been solve, it is safe to
453 # program cross_connections
454 self._logger.debug("ExperimentController: Programming testbed cross-connections")
455 self._program_testbed_cross_connections(data)
457 # perform do_configure in parallel for al testbeds
458 # (it's internal configuration for each)
459 self._logger.debug("ExperimentController: Starting parallel do_configure")
460 self._parallel([testbed.do_configure
461 for testbed in all_restart])
465 #print >>sys.stderr, "DO IT"
469 # cross-connect (cannot be done in parallel)
470 self._logger.debug("ExperimentController: Starting cross-connect")
471 for guid, testbed in self._testbeds.iteritems():
472 cross_data = self._get_cross_data(guid)
473 testbed.do_cross_connect_init(cross_data)
474 for guid, testbed in self._testbeds.iteritems():
475 cross_data = self._get_cross_data(guid)
476 testbed.do_cross_connect_compl(cross_data)
480 # Last chance to configure (parallel on all testbeds)
481 self._logger.debug("ExperimentController: Starting parallel do_prestart")
482 self._parallel([testbed.do_prestart
483 for testbed in all_restart])
485 # final netref step, fail if anything's left unresolved
486 self.do_netrefs(data, fail_if_undefined=True)
491 # update execution xml with execution-specific values
492 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
493 self._update_execute_xml()
494 self.persist_execute_xml()
496 # start experiment (parallel start on all testbeds)
497 self._logger.debug("ExperimentController: Starting parallel do_start")
498 self._parallel([testbed.start
499 for testbed in all_restart])
503 def _clear_caches(self):
504 # Cleaning cache for safety.
505 self._guids_in_testbed_cache = dict()
507 def _persist_testbed_proxies(self):
508 TRANSIENT = (DC.RECOVER,)
510 # persist access configuration for all testbeds, so that
511 # recovery mode can reconnect to them if it becomes necessary
512 conf = ConfigParser.RawConfigParser()
513 for testbed_guid, testbed_config in self._deployment_config.iteritems():
514 testbed_guid = str(testbed_guid)
515 conf.add_section(testbed_guid)
516 for attr in testbed_config.get_attribute_list():
517 if attr not in TRANSIENT:
518 value = testbed_config.get_attribute_value(attr)
519 if value is not None:
520 conf.set(testbed_guid, attr, value)
522 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
526 def _load_testbed_proxies(self):
528 Attribute.STRING : 'get',
529 Attribute.BOOL : 'getboolean',
530 Attribute.ENUM : 'get',
531 Attribute.DOUBLE : 'getfloat',
532 Attribute.INTEGER : 'getint',
535 TRANSIENT = (DC.RECOVER,)
537 # deferred import because proxy needs
538 # our class definitions to define proxies
539 import nepi.util.proxy as proxy
541 conf = ConfigParser.RawConfigParser()
542 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
543 for testbed_guid in conf.sections():
544 testbed_config = proxy.AccessConfiguration()
545 testbed_guid = str(testbed_guid)
546 for attr in testbed_config.get_attribute_list():
547 if attr not in TRANSIENT:
548 getter = getattr(conf, TYPEMAP.get(
549 testbed_config.get_attribute_type(attr),
552 value = getter(testbed_guid, attr)
553 testbed_config.set_attribute_value(attr, value)
554 except ConfigParser.NoOptionError:
558 def _unpersist_testbed_proxies(self):
560 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
562 # Just print exceptions, this is just cleanup
563 self._logger.exception("Loading testbed configuration")
565 def _update_execute_xml(self):
567 # For all elements in testbed,
568 # - gather immutable execute-readable attribuets lists
570 # Generate new design description from design xml
571 # (Wait for attributes lists - implicit syncpoint)
573 # For all elements in testbed,
574 # - gather all immutable execute-readable attribute
575 # values, asynchronously
576 # (Wait for attribute values - implicit syncpoint)
578 # For all elements in testbed,
579 # - inject non-None values into new design
580 # Generate execute xml from new design
582 attribute_lists = dict(
583 (testbed_guid, collections.defaultdict(dict))
584 for testbed_guid in self._testbeds
587 for testbed_guid, testbed in self._testbeds.iteritems():
588 guids = self._guids_in_testbed(testbed_guid)
590 attribute_lists[testbed_guid][guid] = \
591 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
593 parser = XmlExperimentParser()
594 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
596 attribute_values = dict(
597 (testbed_guid, collections.defaultdict(dict))
598 for testbed_guid in self._testbeds
601 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
602 testbed = self._testbeds[testbed_guid]
603 for guid, attribute_list in testbed_attribute_lists.iteritems():
604 attribute_list = _undefer(attribute_list)
605 attribute_values[testbed_guid][guid] = dict(
606 (attribute, testbed.get_deferred(guid, attribute))
607 for attribute in attribute_list
610 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
611 for guid, attribute_values in testbed_attribute_values.iteritems():
612 for attribute, value in attribute_values.iteritems():
613 value = _undefer(value)
614 if value is not None:
615 execute_data.add_attribute_data(guid, attribute, value)
617 self._experiment_execute_xml = parser.to_xml(data=execute_data)
620 for testbed in self._testbeds.values():
622 self._unpersist_testbed_proxies()
623 self._stopped_time = time.time()
626 # reload perviously persisted testbed access configurations
627 self._failed_testbeds.clear()
628 self._load_testbed_proxies()
630 # re-program testbeds that need recovery
631 self._start(recover = True)
633 def is_finished(self, guid):
634 testbed = self._testbed_for_guid(guid)
636 return testbed.status(guid) == AS.STATUS_FINISHED
637 raise RuntimeError("No element exists with guid %d" % guid)
639 def _testbed_recovery_policy(self, guid, data = None):
641 parser = XmlExperimentParser()
642 data = parser.from_xml_to_data(self._experiment_design_xml)
644 return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
646 def status(self, guid):
647 if guid in self._testbeds:
649 # report testbed status
650 if guid in self._failed_testbeds:
651 return TS.STATUS_FAILED
654 return self._testbeds[guid].status()
656 return TS.STATUS_UNRESPONSIVE
659 testbed = self._testbed_for_guid(guid)
660 if testbed is not None:
661 return testbed.status(guid)
663 return AS.STATUS_UNDETERMINED
665 def set(self, guid, name, value, time = TIME_NOW):
666 testbed = self._testbed_for_guid(guid)
668 testbed.set(guid, name, value, time)
670 raise RuntimeError("No element exists with guid %d" % guid)
672 def get(self, guid, name, time = TIME_NOW):
673 testbed = self._testbed_for_guid(guid)
675 return testbed.get(guid, name, time)
676 raise RuntimeError("No element exists with guid %d" % guid)
678 def get_deferred(self, guid, name, time = TIME_NOW):
679 testbed = self._testbed_for_guid(guid)
681 return testbed.get_deferred(guid, name, time)
682 raise RuntimeError("No element exists with guid %d" % guid)
684 def get_factory_id(self, guid):
685 testbed = self._testbed_for_guid(guid)
687 return testbed.get_factory_id(guid)
688 raise RuntimeError("No element exists with guid %d" % guid)
690 def get_testbed_id(self, guid):
691 testbed = self._testbed_for_guid(guid)
693 return testbed.testbed_id
694 raise RuntimeError("No element exists with guid %d" % guid)
696 def get_testbed_version(self, guid):
697 testbed = self._testbed_for_guid(guid)
699 return testbed.testbed_version
700 raise RuntimeError("No element exists with guid %d" % guid)
704 ordered_testbeds = set()
706 def shutdown_testbed(guid):
708 testbed = self._testbeds[guid]
709 ordered_testbeds.add(guid)
712 exceptions.append(sys.exc_info())
714 self._logger.debug("ExperimentController: Starting parallel shutdown")
716 for testbed_guids in reversed(self._testbed_order):
717 testbed_guids = set(testbed_guids) - ordered_testbeds
718 self._logger.debug("ExperimentController: Shutting down %r", testbed_guids)
719 self._parallel([functools.partial(shutdown_testbed, guid)
720 for guid in testbed_guids])
721 remaining_guids = set(self._testbeds) - ordered_testbeds
723 self._logger.debug("ExperimentController: Shutted down %r", ordered_testbeds)
724 self._logger.debug("ExperimentController: Shutting down %r", remaining_guids)
725 self._parallel([functools.partial(shutdown_testbed, guid)
726 for guid in remaining_guids])
728 for exc_info in exceptions:
729 raise exc_info[0], exc_info[1], exc_info[2]
731 def _testbed_for_guid(self, guid):
732 for testbed_guid in self._testbeds.keys():
733 if guid in self._guids_in_testbed(testbed_guid):
734 if testbed_guid in self._failed_testbeds:
736 return self._testbeds[testbed_guid]
739 def _guids_in_testbed(self, testbed_guid):
740 if testbed_guid not in self._testbeds:
742 if testbed_guid not in self._guids_in_testbed_cache:
743 self._guids_in_testbed_cache[testbed_guid] = \
744 set(self._testbeds[testbed_guid].guids)
745 return self._guids_in_testbed_cache[testbed_guid]
748 def _netref_component_split(component):
749 match = COMPONENT_PATTERN.match(component)
751 return match.group("kind"), match.group("index")
753 return component, None
755 _NETREF_COMPONENT_GETTERS = {
757 lambda testbed, guid, index, name:
758 testbed.get_address(guid, int(index), name),
760 lambda testbed, guid, index, name:
761 testbed.get_route(guid, int(index), name),
763 lambda testbed, guid, index, name:
764 testbed.trace(guid, index, attribute = name),
766 lambda testbed, guid, index, name:
767 testbed.get(guid, name),
770 def resolve_netref_value(self, value, failval = None):
773 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
774 label = match.group("label")
775 if label.startswith('GUID-'):
776 ref_guid = int(label[5:])
778 expr = match.group("expr")
779 component = (match.group("component") or "")[1:] # skip the dot
780 attribute = match.group("attribute")
782 # split compound components into component kind and index
783 # eg: 'addr[0]' -> ('addr', '0')
784 component, component_index = self._netref_component_split(component)
786 # find object and resolve expression
787 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
788 if component not in self._NETREF_COMPONENT_GETTERS:
789 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
790 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
793 ref_value = self._NETREF_COMPONENT_GETTERS[component](
794 ref_testbed, ref_guid, component_index, attribute)
796 value = rv = value.replace(match.group(), ref_value)
799 # unresolvable netref
806 def do_netrefs(self, data, fail_if_undefined = False):
808 for (testbed_guid, guid), attrs in self._netrefs.items():
809 testbed = self._testbeds.get(testbed_guid)
810 if testbed is not None:
811 for name in set(attrs):
812 value = testbed.get(guid, name)
813 if isinstance(value, basestring):
814 ref_value = self.resolve_netref_value(value)
815 if ref_value is not None:
816 testbed.set(guid, name, ref_value)
818 elif fail_if_undefined:
819 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
821 del self._netrefs[(testbed_guid, guid)]
824 for testbed_guid, attrs in self._testbed_netrefs.items():
825 tb_data = dict(data.get_attribute_data(testbed_guid))
827 for name in set(attrs):
828 value = tb_data.get(name)
829 if isinstance(value, basestring):
830 ref_value = self.resolve_netref_value(value)
831 if ref_value is not None:
832 data.set_attribute_data(testbed_guid, name, ref_value)
834 elif fail_if_undefined:
835 raise ValueError, "Unresolvable netref in: %r" % (value,)
837 del self._testbed_netrefs[testbed_guid]
840 def _init_testbed_controllers(self, data, recover = False):
841 blacklist_testbeds = set(self._testbeds)
842 element_guids = list()
844 data_guids = data.guids
848 # gather label associations
849 for guid in data_guids:
850 if not data.is_testbed_data(guid):
851 (testbed_guid, factory_id) = data.get_box_data(guid)
852 label = data.get_attribute_data(guid, "label")
853 if label is not None:
854 if label in label_guids:
855 raise RuntimeError, "Label %r is not unique" % (label,)
856 label_guids[label] = guid
858 # create testbed controllers
859 for guid in data_guids:
860 if data.is_testbed_data(guid):
861 if guid not in self._testbeds:
863 self._create_testbed_controller(
864 guid, data, element_guids, recover)
867 blacklist_testbeds.add(guid)
872 policy = self._testbed_recovery_policy(guid, data=data)
873 if policy == DC.POLICY_RECOVER:
874 self._create_testbed_controller(
875 guid, data, element_guids, False)
877 elif policy == DC.POLICY_RESTART:
878 self._create_testbed_controller(
879 guid, data, element_guids, False)
883 self._failed_testbeds.add(guid)
887 # queue programmable elements
888 # - that have not been programmed already (blacklist_testbeds)
889 # - including recovered or restarted testbeds
890 # - but those that have no unresolved netrefs
891 for guid in data_guids:
892 if not data.is_testbed_data(guid):
893 (testbed_guid, factory_id) = data.get_box_data(guid)
894 if testbed_guid not in blacklist_testbeds:
895 element_guids.append(guid)
897 # replace references to elements labels for its guid
898 self._resolve_labels(data, data_guids, label_guids)
900 # program testbed controllers
902 self._program_testbed_controllers(element_guids, data)
904 return to_recover, to_restart
906 def _resolve_labels(self, data, data_guids, label_guids):
907 netrefs = self._netrefs
908 testbed_netrefs = self._testbed_netrefs
909 for guid in data_guids:
910 for name, value in data.get_attribute_data(guid):
911 if isinstance(value, basestring):
913 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
914 label = match.group("label")
915 if not label.startswith('GUID-'):
916 ref_guid = label_guids.get(label)
917 if ref_guid is not None:
918 value = value.replace(
920 ATTRIBUTE_PATTERN_GUID_SUB % dict(
921 guid = 'GUID-%d' % (ref_guid,),
922 expr = match.group("expr"),
925 data.set_attribute_data(guid, name, value)
927 # memorize which guid-attribute pairs require
928 # postprocessing, to avoid excessive controller-testbed
929 # communication at configuration time
930 # (which could require high-latency network I/O)
931 if not data.is_testbed_data(guid):
932 (testbed_guid, factory_id) = data.get_box_data(guid)
933 netrefs[(testbed_guid, guid)].add(name)
935 testbed_netrefs[guid].add(name)
941 def _create_testbed_controller(self, guid, data, element_guids, recover):
942 (testbed_id, testbed_version) = data.get_testbed_data(guid)
943 deployment_config = self._deployment_config.get(guid)
945 # deferred import because proxy needs
946 # our class definitions to define proxies
947 import nepi.util.proxy as proxy
949 if deployment_config is None:
951 deployment_config = proxy.AccessConfiguration()
953 for (name, value) in data.get_attribute_data(guid):
954 if value is not None and deployment_config.has_attribute(name):
955 # if any deployment config attribute has a netref, we can't
956 # create this controller yet
957 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
958 # remember to re-issue this one
959 self._netreffed_testbeds.add(guid)
962 # copy deployment config attribute
963 deployment_config.set_attribute_value(name, value)
966 self._deployment_config[guid] = deployment_config
968 if deployment_config is not None:
969 # force recovery mode
970 deployment_config.set_attribute_value("recover",recover)
972 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
974 for (name, value) in data.get_attribute_data(guid):
975 testbed.defer_configure(name, value)
976 self._testbeds[guid] = testbed
977 if guid in self._netreffed_testbeds:
978 self._netreffed_testbeds.remove(guid)
980 def _program_testbed_controllers(self, element_guids, data):
981 def resolve_create_netref(data, guid, name, value):
982 # Try to resolve create-time netrefs, if possible
983 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
985 nuvalue = self.resolve_netref_value(value)
987 # Any trouble means we're not in shape to resolve the netref yet
989 if nuvalue is not None:
990 # Only if we succeed we remove the netref deferral entry
992 data.set_attribute_data(guid, name, value)
993 if (testbed_guid, guid) in self._netrefs:
994 self._netrefs[(testbed_guid, guid)].discard(name)
997 for guid in element_guids:
998 (testbed_guid, factory_id) = data.get_box_data(guid)
999 testbed = self._testbeds.get(testbed_guid)
1000 if testbed is not None:
1002 testbed.defer_create(guid, factory_id)
1004 for (name, value) in data.get_attribute_data(guid):
1005 value = resolve_create_netref(data, guid, name, value)
1006 testbed.defer_create_set(guid, name, value)
1008 for guid in element_guids:
1009 (testbed_guid, factory_id) = data.get_box_data(guid)
1010 testbed = self._testbeds.get(testbed_guid)
1011 if testbed is not None:
1013 for trace_id in data.get_trace_data(guid):
1014 testbed.defer_add_trace(guid, trace_id)
1016 for (address, netprefix, broadcast) in data.get_address_data(guid):
1018 testbed.defer_add_address(guid, address, netprefix,
1021 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
1022 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
1023 # store connections data
1024 for (connector_type_name, other_guid, other_connector_type_name) \
1025 in data.get_connection_data(guid):
1026 (other_testbed_guid, other_factory_id) = data.get_box_data(
1028 if testbed_guid == other_testbed_guid:
1029 # each testbed should take care of enforcing internal
1030 # connection simmetry, so each connection is only
1031 # added in one direction
1032 testbed.defer_connect(guid, connector_type_name,
1033 other_guid, other_connector_type_name)
1035 def _program_testbed_cross_connections(self, data):
1036 data_guids = data.guids
1037 for guid in data_guids:
1038 if not data.is_testbed_data(guid):
1039 (testbed_guid, factory_id) = data.get_box_data(guid)
1040 testbed = self._testbeds.get(testbed_guid)
1041 if testbed is not None:
1042 for (connector_type_name, cross_guid, cross_connector_type_name) \
1043 in data.get_connection_data(guid):
1044 (testbed_guid, factory_id) = data.get_box_data(guid)
1045 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
1047 if testbed_guid != cross_testbed_guid:
1048 cross_testbed = self._testbeds[cross_testbed_guid]
1049 cross_testbed_id = cross_testbed.testbed_id
1050 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
1051 cross_testbed_guid, cross_testbed_id, cross_factory_id,
1052 cross_connector_type_name)
1053 # save cross data for later
1054 self._logger.debug("ExperimentController: adding cross_connection data tbd=%d:guid=%d - tbd=%d:guid=%d" % \
1055 (testbed_guid, guid, cross_testbed_guid, cross_guid))
1056 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1059 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1060 if testbed_guid not in self._cross_data:
1061 self._cross_data[testbed_guid] = dict()
1062 if cross_testbed_guid not in self._cross_data[testbed_guid]:
1063 self._cross_data[testbed_guid][cross_testbed_guid] = set()
1064 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1066 def _get_cross_data(self, testbed_guid):
1068 if not testbed_guid in self._cross_data:
1071 # fetch attribute lists in one batch
1072 attribute_lists = dict()
1073 for cross_testbed_guid, guid_list in \
1074 self._cross_data[testbed_guid].iteritems():
1075 cross_testbed = self._testbeds[cross_testbed_guid]
1076 for cross_guid in guid_list:
1077 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1078 cross_testbed.get_attribute_list_deferred(cross_guid)
1080 # fetch attribute values in another batch
1081 for cross_testbed_guid, guid_list in \
1082 self._cross_data[testbed_guid].iteritems():
1083 cross_data[cross_testbed_guid] = dict()
1084 cross_testbed = self._testbeds[cross_testbed_guid]
1085 for cross_guid in guid_list:
1086 elem_cross_data = dict(
1088 _testbed_guid = cross_testbed_guid,
1089 _testbed_id = cross_testbed.testbed_id,
1090 _testbed_version = cross_testbed.testbed_version)
1091 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1092 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1093 for attr_name in attribute_list:
1094 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1095 elem_cross_data[attr_name] = attr_value
1097 # undefer all values - we'll have to serialize them probably later
1098 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1099 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1100 for attr_name, attr_value in elem_cross_data.iteritems():
1101 elem_cross_data[attr_name] = _undefer(attr_value)
1105 class ExperimentSuite(object):
1106 def __init__(self, experiment_xml, access_config, repetitions,
1107 duration, wait_guids):
1108 self._experiment_xml = experiment_xml
1109 self._access_config = access_config
1110 self._experiments = dict()
1111 self._repetitions = repetitions
1112 self._duration = duration
1113 self._wait_guids = wait_guids
1114 self._current = None
1115 self._status = TS.STATUS_ZERO
1119 self._status = TS.STATUS_STARTED
1120 self._thread = threading.Thread(target = self._run_experiment_suite)
1121 self._thread.start()
1128 def _run_experiment_suite(self):
1129 for i in xrange[0, self.repetitions]:
1131 self._run_one_experiment()
1133 def _run_one_experiment(self):
1134 access_config = proxy.AccessConfiguration()
1135 for attr in self._access_config.attributes:
1136 access_config.set_attribute_value(attr.name, attr.value)
1137 access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1138 root_dir = "%s_%d" % (
1139 access_config.get_attribute_value(DC.ROOT_DIRECTORY),
1141 access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1142 controller = proxy.create_experiment_controller(self._experiment_xml,
1144 self._experiments[self._current] = controller
1146 started_at = time.time()
1147 # wait until all specified guids have finished execution
1148 if self._wait_guids:
1149 while all(itertools.imap(controller.is_finished, self._wait_guids):
1151 # wait until the minimum experiment duration time has elapsed
1153 while (time.time() - started_at) < self._duration:
1157 controller.shutdown()