1 # -*- coding: utf-8 -*-
3 from nepi.core.attributes import Attribute, AttributesMap
4 from nepi.util import validation
5 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
6 from nepi.util.parser._xml import XmlExperimentParser
18 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
19 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
20 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
22 def _undefer(deferred):
23 if hasattr(deferred, '_get'):
24 return deferred._get()
29 class TestbedController(object):
30 def __init__(self, testbed_id, testbed_version):
31 self._testbed_id = testbed_id
32 self._testbed_version = testbed_version
36 return self._testbed_id
39 def testbed_version(self):
40 return self._testbed_version
44 raise NotImplementedError
46 def defer_configure(self, name, value):
47 """Instructs setting a configuartion attribute for the testbed instance"""
48 raise NotImplementedError
50 def defer_create(self, guid, factory_id):
51 """Instructs creation of element """
52 raise NotImplementedError
54 def defer_create_set(self, guid, name, value):
55 """Instructs setting an initial attribute on an element"""
56 raise NotImplementedError
58 def defer_factory_set(self, guid, name, value):
59 """Instructs setting an attribute on a factory"""
60 raise NotImplementedError
62 def defer_connect(self, guid1, connector_type_name1, guid2,
63 connector_type_name2):
64 """Instructs creation of a connection between the given connectors"""
65 raise NotImplementedError
67 def defer_cross_connect(self,
68 guid, connector_type_name,
69 cross_guid, cross_testbed_guid,
70 cross_testbed_id, cross_factory_id,
71 cross_connector_type_name):
73 Instructs creation of a connection between the given connectors
74 of different testbed instances
76 raise NotImplementedError
78 def defer_add_trace(self, guid, trace_id):
79 """Instructs the addition of a trace"""
80 raise NotImplementedError
82 def defer_add_address(self, guid, address, netprefix, broadcast):
83 """Instructs the addition of an address"""
84 raise NotImplementedError
86 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
87 """Instructs the addition of a route"""
88 raise NotImplementedError
91 """After do_setup the testbed initial configuration is done"""
92 raise NotImplementedError
96 After do_create all instructed elements are created and
99 raise NotImplementedError
101 def do_connect_init(self):
103 After do_connect_init all internal connections between testbed elements
106 raise NotImplementedError
108 def do_connect_compl(self):
110 After do_connect all internal connections between testbed elements
113 raise NotImplementedError
115 def do_preconfigure(self):
117 Done just before resolving netrefs, after connection, before cross connections,
118 useful for early stages of configuration, for setting up stuff that might be
119 required for netref resolution.
121 raise NotImplementedError
123 def do_configure(self):
124 """After do_configure elements are configured"""
125 raise NotImplementedError
127 def do_prestart(self):
128 """Before do_start elements are prestart-configured"""
129 raise NotImplementedError
131 def do_cross_connect_init(self, cross_data):
133 After do_cross_connect_init initiation of all external connections
134 between different testbed elements is performed
136 raise NotImplementedError
138 def do_cross_connect_compl(self, cross_data):
140 After do_cross_connect_compl completion of all external connections
141 between different testbed elements is performed
143 raise NotImplementedError
146 raise NotImplementedError
149 raise NotImplementedError
153 On testbed recovery (if recovery is a supported policy), the controller
154 instance will be re-created and the following sequence invoked:
157 defer_X - programming the testbed with persisted execution values
158 (not design values). Execution values (ExecImmutable attributes)
159 should be enough to recreate the testbed's state.
161 <cross-connection methods>
163 Start will not be called, and after cross connection invocations,
164 the testbed is supposed to be fully functional again.
166 raise NotImplementedError
168 def set(self, guid, name, value, time = TIME_NOW):
169 raise NotImplementedError
171 def get(self, guid, name, time = TIME_NOW):
172 raise NotImplementedError
174 def get_route(self, guid, index, attribute):
178 guid: guid of box to query
179 index: number of routing entry to fetch
180 attribute: one of Destination, NextHop, NetPrefix
182 raise NotImplementedError
184 def get_address(self, guid, index, attribute='Address'):
188 guid: guid of box to query
189 index: number of inteface to select
190 attribute: one of Address, NetPrefix, Broadcast
192 raise NotImplementedError
194 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
195 raise NotImplementedError
197 def get_factory_id(self, guid):
198 raise NotImplementedError
200 def action(self, time, guid, action):
201 raise NotImplementedError
203 def status(self, guid):
204 raise NotImplementedError
206 def testbed_status(self):
207 raise NotImplementedError
209 def trace(self, guid, trace_id, attribute='value'):
210 raise NotImplementedError
212 def traces_info(self):
213 """ dictionary of dictionaries:
219 filesize = size in bytes,
223 raise NotImplementedError
226 raise NotImplementedError
228 class ExperimentController(object):
229 def __init__(self, experiment_xml, root_dir):
230 self._experiment_design_xml = experiment_xml
231 self._experiment_execute_xml = None
232 self._testbeds = dict()
233 self._deployment_config = dict()
234 self._netrefs = collections.defaultdict(set)
235 self._testbed_netrefs = collections.defaultdict(set)
236 self._cross_data = dict()
237 self._root_dir = root_dir
238 self._netreffed_testbeds = set()
239 self._guids_in_testbed_cache = dict()
240 self._failed_testbeds = set()
241 self._started_time = None
242 self._stopped_time = None
243 self._testbed_order = []
245 self._logger = logging.getLogger('nepi.core.execute')
246 level = logging.ERROR
247 if os.environ.get("NEPI_CONTROLLER_LOGLEVEL",
248 DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
249 level = logging.DEBUG
250 self._logger.setLevel(level)
252 if experiment_xml is None and root_dir is not None:
254 self.load_experiment_xml()
255 self.load_execute_xml()
257 self.persist_experiment_xml()
260 def experiment_design_xml(self):
261 return self._experiment_design_xml
264 def experiment_execute_xml(self):
265 return self._experiment_execute_xml
268 def started_time(self):
269 return self._started_time
272 def stopped_time(self):
273 return self._stopped_time
278 for testbed_guid in self._testbeds.keys():
279 _guids = self._guids_in_testbed(testbed_guid)
284 def persist_experiment_xml(self):
285 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
286 f = open(xml_path, "w")
287 f.write(self._experiment_design_xml)
290 def persist_execute_xml(self):
291 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
292 f = open(xml_path, "w")
293 f.write(self._experiment_execute_xml)
296 def load_experiment_xml(self):
297 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
298 f = open(xml_path, "r")
299 self._experiment_design_xml = f.read()
302 def load_execute_xml(self):
303 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
304 f = open(xml_path, "r")
305 self._experiment_execute_xml = f.read()
308 def trace(self, guid, trace_id, attribute='value'):
309 testbed = self._testbed_for_guid(guid)
311 return testbed.trace(guid, trace_id, attribute)
312 raise RuntimeError("No element exists with guid %d" % guid)
314 def traces_info(self):
316 for guid, testbed in self._testbeds.iteritems():
317 tinfo = testbed.traces_info()
319 traces_info[guid] = testbed.traces_info()
323 def _parallel(callables):
326 def wrapped(*p, **kw):
330 logging.exception("Exception occurred in asynchronous thread:")
331 excs.append(sys.exc_info())
333 wrapped = functools.wraps(callable)(wrapped)
335 # functools.partial not wrappable
338 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
339 for thread in threads:
341 for thread in threads:
344 eTyp, eVal, eLoc = exc
345 raise eTyp, eVal, eLoc
348 self._started_time = time.time()
351 def _start(self, recover = False):
352 parser = XmlExperimentParser()
355 xml = self._experiment_execute_xml
357 xml = self._experiment_design_xml
358 data = parser.from_xml_to_data(xml)
360 # instantiate testbed controllers
361 to_recover, to_restart = self._init_testbed_controllers(data, recover)
362 all_restart = set(to_restart)
365 # persist testbed connection data, for potential recovery
366 self._persist_testbed_proxies()
368 # recover recoverable controllers
369 for guid in to_recover:
371 self._testbeds[guid].do_setup()
372 self._testbeds[guid].recover()
374 self._logger.exception("During recovery of testbed %s", guid)
377 self._failed_testbeds.add(guid)
379 def steps_to_configure(self, allowed_guids):
380 # perform setup in parallel for all test beds,
381 # wait for all threads to finish
383 self._logger.debug("ExperimentController: Starting parallel do_setup")
384 self._parallel([testbed.do_setup
385 for guid,testbed in self._testbeds.iteritems()
386 if guid in allowed_guids])
388 # perform create-connect in parallel, wait
389 # (internal connections only)
390 self._logger.debug("ExperimentController: Starting parallel do_create")
391 self._parallel([testbed.do_create
392 for guid,testbed in self._testbeds.iteritems()
393 if guid in allowed_guids])
395 self._logger.debug("ExperimentController: Starting parallel do_connect_init")
396 self._parallel([testbed.do_connect_init
397 for guid,testbed in self._testbeds.iteritems()
398 if guid in allowed_guids])
400 self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
401 self._parallel([testbed.do_connect_compl
402 for guid,testbed in self._testbeds.iteritems()
403 if guid in allowed_guids])
405 self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
406 self._parallel([testbed.do_preconfigure
407 for guid,testbed in self._testbeds.iteritems()
408 if guid in allowed_guids])
411 # Store testbed order
412 self._testbed_order.append(allowed_guids)
414 steps_to_configure(self, to_restart)
416 if self._netreffed_testbeds:
417 self._logger.debug("ExperimentController: Resolving netreffed testbeds")
418 # initally resolve netrefs
419 self.do_netrefs(data, fail_if_undefined=False)
421 # rinse and repeat, for netreffed testbeds
422 netreffed_testbeds = set(self._netreffed_testbeds)
424 to_recover, to_restart = self._init_testbed_controllers(data, recover)
425 all_restart.update(to_restart)
428 # persist testbed connection data, for potential recovery
429 self._persist_testbed_proxies()
431 # recover recoverable controllers
432 for guid in to_recover:
434 self._testbeds[guid].do_setup()
435 self._testbeds[guid].recover()
437 self._logger.exception("During recovery of testbed %s", guid)
440 self._failed_testbeds.add(guid)
442 # configure dependant testbeds
443 steps_to_configure(self, to_restart)
445 all_restart = [ self._testbeds[guid] for guid in all_restart ]
447 # final netref step, fail if anything's left unresolved
448 self._logger.debug("ExperimentController: Resolving do_netrefs")
449 self.do_netrefs(data, fail_if_undefined=False)
451 # Only now, that netref dependencies have been solve, it is safe to
452 # program cross_connections
453 self._logger.debug("ExperimentController: Programming testbed cross-connections")
454 self._program_testbed_cross_connections(data)
456 # perform do_configure in parallel for al testbeds
457 # (it's internal configuration for each)
458 self._logger.debug("ExperimentController: Starting parallel do_configure")
459 self._parallel([testbed.do_configure
460 for testbed in all_restart])
464 #print >>sys.stderr, "DO IT"
468 # cross-connect (cannot be done in parallel)
469 self._logger.debug("ExperimentController: Starting cross-connect")
470 for guid, testbed in self._testbeds.iteritems():
471 cross_data = self._get_cross_data(guid)
472 testbed.do_cross_connect_init(cross_data)
473 for guid, testbed in self._testbeds.iteritems():
474 cross_data = self._get_cross_data(guid)
475 testbed.do_cross_connect_compl(cross_data)
479 # Last chance to configure (parallel on all testbeds)
480 self._logger.debug("ExperimentController: Starting parallel do_prestart")
481 self._parallel([testbed.do_prestart
482 for testbed in all_restart])
484 # final netref step, fail if anything's left unresolved
485 self.do_netrefs(data, fail_if_undefined=True)
490 # update execution xml with execution-specific values
491 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
492 self._update_execute_xml()
493 self.persist_execute_xml()
495 # start experiment (parallel start on all testbeds)
496 self._logger.debug("ExperimentController: Starting parallel do_start")
497 self._parallel([testbed.start
498 for testbed in all_restart])
502 def _clear_caches(self):
503 # Cleaning cache for safety.
504 self._guids_in_testbed_cache = dict()
506 def _persist_testbed_proxies(self):
507 TRANSIENT = (DC.RECOVER,)
509 # persist access configuration for all testbeds, so that
510 # recovery mode can reconnect to them if it becomes necessary
511 conf = ConfigParser.RawConfigParser()
512 for testbed_guid, testbed_config in self._deployment_config.iteritems():
513 testbed_guid = str(testbed_guid)
514 conf.add_section(testbed_guid)
515 for attr in testbed_config.get_attribute_list():
516 if attr not in TRANSIENT:
517 value = testbed_config.get_attribute_value(attr)
518 if value is not None:
519 conf.set(testbed_guid, attr, value)
521 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
525 def _load_testbed_proxies(self):
527 Attribute.STRING : 'get',
528 Attribute.BOOL : 'getboolean',
529 Attribute.ENUM : 'get',
530 Attribute.DOUBLE : 'getfloat',
531 Attribute.INTEGER : 'getint',
534 TRANSIENT = (DC.RECOVER,)
536 # deferred import because proxy needs
537 # our class definitions to define proxies
538 import nepi.util.proxy as proxy
540 conf = ConfigParser.RawConfigParser()
541 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
542 for testbed_guid in conf.sections():
543 testbed_config = proxy.AccessConfiguration()
544 testbed_guid = str(testbed_guid)
545 for attr in testbed_config.get_attribute_list():
546 if attr not in TRANSIENT:
547 getter = getattr(conf, TYPEMAP.get(
548 testbed_config.get_attribute_type(attr),
551 value = getter(testbed_guid, attr)
552 testbed_config.set_attribute_value(attr, value)
553 except ConfigParser.NoOptionError:
557 def _unpersist_testbed_proxies(self):
559 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
561 # Just print exceptions, this is just cleanup
562 self._logger.exception("Loading testbed configuration")
564 def _update_execute_xml(self):
566 # For all elements in testbed,
567 # - gather immutable execute-readable attribuets lists
569 # Generate new design description from design xml
570 # (Wait for attributes lists - implicit syncpoint)
572 # For all elements in testbed,
573 # - gather all immutable execute-readable attribute
574 # values, asynchronously
575 # (Wait for attribute values - implicit syncpoint)
577 # For all elements in testbed,
578 # - inject non-None values into new design
579 # Generate execute xml from new design
581 attribute_lists = dict(
582 (testbed_guid, collections.defaultdict(dict))
583 for testbed_guid in self._testbeds
586 for testbed_guid, testbed in self._testbeds.iteritems():
587 guids = self._guids_in_testbed(testbed_guid)
589 attribute_lists[testbed_guid][guid] = \
590 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
592 parser = XmlExperimentParser()
593 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
595 attribute_values = dict(
596 (testbed_guid, collections.defaultdict(dict))
597 for testbed_guid in self._testbeds
600 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
601 testbed = self._testbeds[testbed_guid]
602 for guid, attribute_list in testbed_attribute_lists.iteritems():
603 attribute_list = _undefer(attribute_list)
604 attribute_values[testbed_guid][guid] = dict(
605 (attribute, testbed.get_deferred(guid, attribute))
606 for attribute in attribute_list
609 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
610 for guid, attribute_values in testbed_attribute_values.iteritems():
611 for attribute, value in attribute_values.iteritems():
612 value = _undefer(value)
613 if value is not None:
614 execute_data.add_attribute_data(guid, attribute, value)
616 self._experiment_execute_xml = parser.to_xml(data=execute_data)
619 for testbed in self._testbeds.values():
621 self._unpersist_testbed_proxies()
622 self._stopped_time = time.time()
625 # reload perviously persisted testbed access configurations
626 self._failed_testbeds.clear()
627 self._load_testbed_proxies()
629 # re-program testbeds that need recovery
630 self._start(recover = True)
632 def is_finished(self, guid):
633 testbed = self._testbed_for_guid(guid)
635 return testbed.status(guid) == AS.STATUS_FINISHED
636 raise RuntimeError("No element exists with guid %d" % guid)
638 def _testbed_recovery_policy(self, guid, data = None):
640 parser = XmlExperimentParser()
641 data = parser.from_xml_to_data(self._experiment_design_xml)
643 return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
645 def status(self, guid):
646 if guid in self._testbeds:
648 # report testbed status
649 if guid in self._failed_testbeds:
650 return TS.STATUS_FAILED
653 return self._testbeds[guid].status()
655 return TS.STATUS_UNRESPONSIVE
658 testbed = self._testbed_for_guid(guid)
659 if testbed is not None:
660 return testbed.status(guid)
662 return AS.STATUS_UNDETERMINED
664 def set(self, guid, name, value, time = TIME_NOW):
665 testbed = self._testbed_for_guid(guid)
667 testbed.set(guid, name, value, time)
669 raise RuntimeError("No element exists with guid %d" % guid)
671 def get(self, guid, name, time = TIME_NOW):
672 testbed = self._testbed_for_guid(guid)
674 return testbed.get(guid, name, time)
675 raise RuntimeError("No element exists with guid %d" % guid)
677 def get_deferred(self, guid, name, time = TIME_NOW):
678 testbed = self._testbed_for_guid(guid)
680 return testbed.get_deferred(guid, name, time)
681 raise RuntimeError("No element exists with guid %d" % guid)
683 def get_factory_id(self, guid):
684 testbed = self._testbed_for_guid(guid)
686 return testbed.get_factory_id(guid)
687 raise RuntimeError("No element exists with guid %d" % guid)
689 def get_testbed_id(self, guid):
690 testbed = self._testbed_for_guid(guid)
692 return testbed.testbed_id
693 raise RuntimeError("No element exists with guid %d" % guid)
695 def get_testbed_version(self, guid):
696 testbed = self._testbed_for_guid(guid)
698 return testbed.testbed_version
699 raise RuntimeError("No element exists with guid %d" % guid)
703 ordered_testbeds = set()
705 def shutdown_testbed(guid):
707 testbed = self._testbeds[guid]
708 ordered_testbeds.add(guid)
711 exceptions.append(sys.exc_info())
713 self._logger.debug("ExperimentController: Starting parallel shutdown")
715 for testbed_guids in reversed(self._testbed_order):
716 testbed_guids = set(testbed_guids) - ordered_testbeds
717 self._logger.debug("ExperimentController: Shutting down %r", testbed_guids)
718 self._parallel([functools.partial(shutdown_testbed, guid)
719 for guid in testbed_guids])
720 remaining_guids = set(self._testbeds) - ordered_testbeds
722 self._logger.debug("ExperimentController: Shutted down %r", ordered_testbeds)
723 self._logger.debug("ExperimentController: Shutting down %r", remaining_guids)
724 self._parallel([functools.partial(shutdown_testbed, guid)
725 for guid in remaining_guids])
727 for exc_info in exceptions:
728 raise exc_info[0], exc_info[1], exc_info[2]
730 def _testbed_for_guid(self, guid):
731 for testbed_guid in self._testbeds.keys():
732 if guid in self._guids_in_testbed(testbed_guid):
733 if testbed_guid in self._failed_testbeds:
735 return self._testbeds[testbed_guid]
738 def _guids_in_testbed(self, testbed_guid):
739 if testbed_guid not in self._testbeds:
741 if testbed_guid not in self._guids_in_testbed_cache:
742 self._guids_in_testbed_cache[testbed_guid] = \
743 set(self._testbeds[testbed_guid].guids)
744 return self._guids_in_testbed_cache[testbed_guid]
747 def _netref_component_split(component):
748 match = COMPONENT_PATTERN.match(component)
750 return match.group("kind"), match.group("index")
752 return component, None
754 _NETREF_COMPONENT_GETTERS = {
756 lambda testbed, guid, index, name:
757 testbed.get_address(guid, int(index), name),
759 lambda testbed, guid, index, name:
760 testbed.get_route(guid, int(index), name),
762 lambda testbed, guid, index, name:
763 testbed.trace(guid, index, attribute = name),
765 lambda testbed, guid, index, name:
766 testbed.get(guid, name),
769 def resolve_netref_value(self, value, failval = None):
772 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
773 label = match.group("label")
774 if label.startswith('GUID-'):
775 ref_guid = int(label[5:])
777 expr = match.group("expr")
778 component = (match.group("component") or "")[1:] # skip the dot
779 attribute = match.group("attribute")
781 # split compound components into component kind and index
782 # eg: 'addr[0]' -> ('addr', '0')
783 component, component_index = self._netref_component_split(component)
785 # find object and resolve expression
786 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
787 if component not in self._NETREF_COMPONENT_GETTERS:
788 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
789 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
792 ref_value = self._NETREF_COMPONENT_GETTERS[component](
793 ref_testbed, ref_guid, component_index, attribute)
795 value = rv = value.replace(match.group(), ref_value)
798 # unresolvable netref
805 def do_netrefs(self, data, fail_if_undefined = False):
807 for (testbed_guid, guid), attrs in self._netrefs.items():
808 testbed = self._testbeds.get(testbed_guid)
809 if testbed is not None:
810 for name in set(attrs):
811 value = testbed.get(guid, name)
812 if isinstance(value, basestring):
813 ref_value = self.resolve_netref_value(value)
814 if ref_value is not None:
815 testbed.set(guid, name, ref_value)
817 elif fail_if_undefined:
818 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
820 del self._netrefs[(testbed_guid, guid)]
823 for testbed_guid, attrs in self._testbed_netrefs.items():
824 tb_data = dict(data.get_attribute_data(testbed_guid))
826 for name in set(attrs):
827 value = tb_data.get(name)
828 if isinstance(value, basestring):
829 ref_value = self.resolve_netref_value(value)
830 if ref_value is not None:
831 data.set_attribute_data(testbed_guid, name, ref_value)
833 elif fail_if_undefined:
834 raise ValueError, "Unresolvable netref in: %r" % (value,)
836 del self._testbed_netrefs[testbed_guid]
839 def _init_testbed_controllers(self, data, recover = False):
840 blacklist_testbeds = set(self._testbeds)
841 element_guids = list()
843 data_guids = data.guids
847 # gather label associations
848 for guid in data_guids:
849 if not data.is_testbed_data(guid):
850 (testbed_guid, factory_id) = data.get_box_data(guid)
851 label = data.get_attribute_data(guid, "label")
852 if label is not None:
853 if label in label_guids:
854 raise RuntimeError, "Label %r is not unique" % (label,)
855 label_guids[label] = guid
857 # create testbed controllers
858 for guid in data_guids:
859 if data.is_testbed_data(guid):
860 if guid not in self._testbeds:
862 self._create_testbed_controller(
863 guid, data, element_guids, recover)
866 blacklist_testbeds.add(guid)
871 policy = self._testbed_recovery_policy(guid, data=data)
872 if policy == DC.POLICY_RECOVER:
873 self._create_testbed_controller(
874 guid, data, element_guids, False)
876 elif policy == DC.POLICY_RESTART:
877 self._create_testbed_controller(
878 guid, data, element_guids, False)
882 self._failed_testbeds.add(guid)
886 # queue programmable elements
887 # - that have not been programmed already (blacklist_testbeds)
888 # - including recovered or restarted testbeds
889 # - but those that have no unresolved netrefs
890 for guid in data_guids:
891 if not data.is_testbed_data(guid):
892 (testbed_guid, factory_id) = data.get_box_data(guid)
893 if testbed_guid not in blacklist_testbeds:
894 element_guids.append(guid)
896 # replace references to elements labels for its guid
897 self._resolve_labels(data, data_guids, label_guids)
899 # program testbed controllers
901 self._program_testbed_controllers(element_guids, data)
903 return to_recover, to_restart
905 def _resolve_labels(self, data, data_guids, label_guids):
906 netrefs = self._netrefs
907 testbed_netrefs = self._testbed_netrefs
908 for guid in data_guids:
909 for name, value in data.get_attribute_data(guid):
910 if isinstance(value, basestring):
912 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
913 label = match.group("label")
914 if not label.startswith('GUID-'):
915 ref_guid = label_guids.get(label)
916 if ref_guid is not None:
917 value = value.replace(
919 ATTRIBUTE_PATTERN_GUID_SUB % dict(
920 guid = 'GUID-%d' % (ref_guid,),
921 expr = match.group("expr"),
924 data.set_attribute_data(guid, name, value)
926 # memorize which guid-attribute pairs require
927 # postprocessing, to avoid excessive controller-testbed
928 # communication at configuration time
929 # (which could require high-latency network I/O)
930 if not data.is_testbed_data(guid):
931 (testbed_guid, factory_id) = data.get_box_data(guid)
932 netrefs[(testbed_guid, guid)].add(name)
934 testbed_netrefs[guid].add(name)
940 def _create_testbed_controller(self, guid, data, element_guids, recover):
941 (testbed_id, testbed_version) = data.get_testbed_data(guid)
942 deployment_config = self._deployment_config.get(guid)
944 # deferred import because proxy needs
945 # our class definitions to define proxies
946 import nepi.util.proxy as proxy
948 if deployment_config is None:
950 deployment_config = proxy.AccessConfiguration()
952 for (name, value) in data.get_attribute_data(guid):
953 if value is not None and deployment_config.has_attribute(name):
954 # if any deployment config attribute has a netref, we can't
955 # create this controller yet
956 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
957 # remember to re-issue this one
958 self._netreffed_testbeds.add(guid)
961 # copy deployment config attribute
962 deployment_config.set_attribute_value(name, value)
965 self._deployment_config[guid] = deployment_config
967 if deployment_config is not None:
968 # force recovery mode
969 deployment_config.set_attribute_value("recover",recover)
971 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
973 for (name, value) in data.get_attribute_data(guid):
974 testbed.defer_configure(name, value)
975 self._testbeds[guid] = testbed
976 if guid in self._netreffed_testbeds:
977 self._netreffed_testbeds.remove(guid)
979 def _program_testbed_controllers(self, element_guids, data):
980 def resolve_create_netref(data, guid, name, value):
981 # Try to resolve create-time netrefs, if possible
982 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
984 nuvalue = self.resolve_netref_value(value)
986 # Any trouble means we're not in shape to resolve the netref yet
988 if nuvalue is not None:
989 # Only if we succeed we remove the netref deferral entry
991 data.set_attribute_data(guid, name, value)
992 if (testbed_guid, guid) in self._netrefs:
993 self._netrefs[(testbed_guid, guid)].discard(name)
996 for guid in element_guids:
997 (testbed_guid, factory_id) = data.get_box_data(guid)
998 testbed = self._testbeds.get(testbed_guid)
999 if testbed is not None:
1001 testbed.defer_create(guid, factory_id)
1003 for (name, value) in data.get_attribute_data(guid):
1004 value = resolve_create_netref(data, guid, name, value)
1005 testbed.defer_create_set(guid, name, value)
1007 for guid in element_guids:
1008 (testbed_guid, factory_id) = data.get_box_data(guid)
1009 testbed = self._testbeds.get(testbed_guid)
1010 if testbed is not None:
1012 for trace_id in data.get_trace_data(guid):
1013 testbed.defer_add_trace(guid, trace_id)
1015 for (address, netprefix, broadcast) in data.get_address_data(guid):
1017 testbed.defer_add_address(guid, address, netprefix,
1020 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
1021 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
1022 # store connections data
1023 for (connector_type_name, other_guid, other_connector_type_name) \
1024 in data.get_connection_data(guid):
1025 (other_testbed_guid, other_factory_id) = data.get_box_data(
1027 if testbed_guid == other_testbed_guid:
1028 # each testbed should take care of enforcing internal
1029 # connection simmetry, so each connection is only
1030 # added in one direction
1031 testbed.defer_connect(guid, connector_type_name,
1032 other_guid, other_connector_type_name)
1034 def _program_testbed_cross_connections(self, data):
1035 data_guids = data.guids
1036 for guid in data_guids:
1037 if not data.is_testbed_data(guid):
1038 (testbed_guid, factory_id) = data.get_box_data(guid)
1039 testbed = self._testbeds.get(testbed_guid)
1040 if testbed is not None:
1041 for (connector_type_name, cross_guid, cross_connector_type_name) \
1042 in data.get_connection_data(guid):
1043 (testbed_guid, factory_id) = data.get_box_data(guid)
1044 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
1046 if testbed_guid != cross_testbed_guid:
1047 cross_testbed = self._testbeds[cross_testbed_guid]
1048 cross_testbed_id = cross_testbed.testbed_id
1049 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
1050 cross_testbed_guid, cross_testbed_id, cross_factory_id,
1051 cross_connector_type_name)
1052 # save cross data for later
1053 self._logger.debug("ExperimentController: adding cross_connection data tbd=%d:guid=%d - tbd=%d:guid=%d" % \
1054 (testbed_guid, guid, cross_testbed_guid, cross_guid))
1055 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1058 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1059 if testbed_guid not in self._cross_data:
1060 self._cross_data[testbed_guid] = dict()
1061 if cross_testbed_guid not in self._cross_data[testbed_guid]:
1062 self._cross_data[testbed_guid][cross_testbed_guid] = set()
1063 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1065 def _get_cross_data(self, testbed_guid):
1067 if not testbed_guid in self._cross_data:
1070 # fetch attribute lists in one batch
1071 attribute_lists = dict()
1072 for cross_testbed_guid, guid_list in \
1073 self._cross_data[testbed_guid].iteritems():
1074 cross_testbed = self._testbeds[cross_testbed_guid]
1075 for cross_guid in guid_list:
1076 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1077 cross_testbed.get_attribute_list_deferred(cross_guid)
1079 # fetch attribute values in another batch
1080 for cross_testbed_guid, guid_list in \
1081 self._cross_data[testbed_guid].iteritems():
1082 cross_data[cross_testbed_guid] = dict()
1083 cross_testbed = self._testbeds[cross_testbed_guid]
1084 for cross_guid in guid_list:
1085 elem_cross_data = dict(
1087 _testbed_guid = cross_testbed_guid,
1088 _testbed_id = cross_testbed.testbed_id,
1089 _testbed_version = cross_testbed.testbed_version)
1090 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1091 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1092 for attr_name in attribute_list:
1093 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1094 elem_cross_data[attr_name] = attr_value
1096 # undefer all values - we'll have to serialize them probably later
1097 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1098 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1099 for attr_name, attr_value in elem_cross_data.iteritems():
1100 elem_cross_data[attr_name] = _undefer(attr_value)
1104 class ExperimentSuite(object):
1105 def __init__(self, experiment_xml, access_config, repetitions = None,
1106 duration = None, wait_guids = None):
1107 self._experiment_xml = experiment_xml
1108 self._access_config = access_config
1109 self._controllers = dict()
1110 self._access_configs = dict()
1111 self._repetitions = 1 if not repetitions else repetitions
1112 self._duration = duration
1113 self._wait_guids = wait_guids
1114 self._current = None
1115 self._status = TS.STATUS_ZERO
1119 return self._current
1124 def is_finished(self):
1125 return self._status == TS.STATUS_STOPPED
1127 def get_access_configurations(self):
1128 return self._access_configs.values()
1131 self._status = TS.STATUS_STARTED
1132 self._thread = threading.Thread(target = self._run_experiment_suite)
1133 self._thread.start()
1139 for controller in self._controllers.values():
1140 controller.shutdown()
1142 def get_current_access_config(self):
1143 return self._access_configs[self._current]
1145 def _run_experiment_suite(self):
1146 for i in xrange(1, self._repetitions):
1148 self._run_one_experiment()
1149 self._status = TS.STATUS_STOPPED
1151 def _run_one_experiment(self):
1152 from nepi.util import proxy
1153 access_config = proxy.AccessConfiguration()
1154 for attr in self._access_config.attributes:
1156 access_config.set_attribute_value(attr.name, attr.value)
1157 access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1158 root_dir = "%s_%d" % (
1159 access_config.get_attribute_value(DC.ROOT_DIRECTORY),
1161 access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1162 controller = proxy.create_experiment_controller(self._experiment_xml,
1164 self._access_configs[self._current] = access_config
1165 self._controllers[self._current] = controller
1167 started_at = time.time()
1168 # wait until all specified guids have finished execution
1169 if self._wait_guids:
1170 while all(itertools.imap(controller.is_finished, self._wait_guids)):
1172 # wait until the minimum experiment duration time has elapsed
1174 while (time.time() - started_at) < self._duration: