2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
18 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
19 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
20 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
22 def _undefer(deferred):
23 if hasattr(deferred, '_get'):
24 return deferred._get()
29 class TestbedController(object):
30 def __init__(self, testbed_id, testbed_version):
31 self._testbed_id = testbed_id
32 self._testbed_version = testbed_version
36 return self._testbed_id
39 def testbed_version(self):
40 return self._testbed_version
44 raise NotImplementedError
46 def defer_configure(self, name, value):
47 """Instructs setting a configuartion attribute for the testbed instance"""
48 raise NotImplementedError
50 def defer_create(self, guid, factory_id):
51 """Instructs creation of element """
52 raise NotImplementedError
54 def defer_create_set(self, guid, name, value):
55 """Instructs setting an initial attribute on an element"""
56 raise NotImplementedError
58 def defer_factory_set(self, guid, name, value):
59 """Instructs setting an attribute on a factory"""
60 raise NotImplementedError
62 def defer_connect(self, guid1, connector_type_name1, guid2,
63 connector_type_name2):
64 """Instructs creation of a connection between the given connectors"""
65 raise NotImplementedError
67 def defer_cross_connect(self,
68 guid, connector_type_name,
69 cross_guid, cross_testbed_guid,
70 cross_testbed_id, cross_factory_id,
71 cross_connector_type_name):
73 Instructs creation of a connection between the given connectors
74 of different testbed instances
76 raise NotImplementedError
78 def defer_add_trace(self, guid, trace_id):
79 """Instructs the addition of a trace"""
80 raise NotImplementedError
82 def defer_add_address(self, guid, address, netprefix, broadcast):
83 """Instructs the addition of an address"""
84 raise NotImplementedError
86 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
87 """Instructs the addition of a route"""
88 raise NotImplementedError
91 """After do_setup the testbed initial configuration is done"""
92 raise NotImplementedError
96 After do_create all instructed elements are created and
99 raise NotImplementedError
101 def do_connect_init(self):
103 After do_connect_init all internal connections between testbed elements
106 raise NotImplementedError
108 def do_connect_compl(self):
110 After do_connect all internal connections between testbed elements
113 raise NotImplementedError
115 def do_preconfigure(self):
117 Done just before resolving netrefs, after connection, before cross connections,
118 useful for early stages of configuration, for setting up stuff that might be
119 required for netref resolution.
121 raise NotImplementedError
123 def do_configure(self):
124 """After do_configure elements are configured"""
125 raise NotImplementedError
127 def do_prestart(self):
128 """Before do_start elements are prestart-configured"""
129 raise NotImplementedError
131 def do_cross_connect_init(self, cross_data):
133 After do_cross_connect_init initiation of all external connections
134 between different testbed elements is performed
136 raise NotImplementedError
138 def do_cross_connect_compl(self, cross_data):
140 After do_cross_connect_compl completion of all external connections
141 between different testbed elements is performed
143 raise NotImplementedError
146 raise NotImplementedError
149 raise NotImplementedError
153 On testbed recovery (if recovery is a supported policy), the controller
154 instance will be re-created and the following sequence invoked:
157 defer_X - programming the testbed with persisted execution values
158 (not design values). Execution values (ExecImmutable attributes)
159 should be enough to recreate the testbed's state.
161 <cross-connection methods>
163 Start will not be called, and after cross connection invocations,
164 the testbed is supposed to be fully functional again.
166 raise NotImplementedError
168 def set(self, guid, name, value, time = TIME_NOW):
169 raise NotImplementedError
171 def get(self, guid, name, time = TIME_NOW):
172 raise NotImplementedError
174 def get_route(self, guid, index, attribute):
178 guid: guid of box to query
179 index: number of routing entry to fetch
180 attribute: one of Destination, NextHop, NetPrefix
182 raise NotImplementedError
184 def get_address(self, guid, index, attribute='Address'):
188 guid: guid of box to query
189 index: number of inteface to select
190 attribute: one of Address, NetPrefix, Broadcast
192 raise NotImplementedError
194 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
195 raise NotImplementedError
197 def get_factory_id(self, guid):
198 raise NotImplementedError
200 def action(self, time, guid, action):
201 raise NotImplementedError
203 def status(self, guid):
204 raise NotImplementedError
206 def testbed_status(self):
207 raise NotImplementedError
209 def trace(self, guid, trace_id, attribute='value'):
210 raise NotImplementedError
212 def traces_info(self):
213 """ dictionary of dictionaries:
219 filesize = size in bytes,
223 raise NotImplementedError
226 raise NotImplementedError
228 class ExperimentController(object):
229 def __init__(self, experiment_xml, root_dir):
230 self._experiment_design_xml = experiment_xml
231 self._experiment_execute_xml = None
232 self._testbeds = dict()
233 self._deployment_config = dict()
234 self._netrefs = collections.defaultdict(set)
235 self._testbed_netrefs = collections.defaultdict(set)
236 self._cross_data = dict()
237 self._root_dir = root_dir
238 self._netreffed_testbeds = set()
239 self._guids_in_testbed_cache = dict()
240 self._failed_testbeds = set()
241 self._started_time = None
242 self._stopped_time = None
244 self._logger = logging.getLogger('nepi.core.execute')
246 if experiment_xml is None and root_dir is not None:
248 self.load_experiment_xml()
249 self.load_execute_xml()
251 self.persist_experiment_xml()
254 def experiment_design_xml(self):
255 return self._experiment_design_xml
258 def experiment_execute_xml(self):
259 return self._experiment_execute_xml
262 def started_time(self):
263 return self._started_time
266 def stopped_time(self):
267 return self._stopped_time
272 for testbed_guid in self._testbeds.keys():
273 _guids = self._guids_in_testbed(testbed_guid)
278 def persist_experiment_xml(self):
279 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
280 f = open(xml_path, "w")
281 f.write(self._experiment_design_xml)
284 def persist_execute_xml(self):
285 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
286 f = open(xml_path, "w")
287 f.write(self._experiment_execute_xml)
290 def load_experiment_xml(self):
291 xml_path = os.path.join(self._root_dir, "experiment-design.xml")
292 f = open(xml_path, "r")
293 self._experiment_design_xml = f.read()
296 def load_execute_xml(self):
297 xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
298 f = open(xml_path, "r")
299 self._experiment_execute_xml = f.read()
302 def trace(self, guid, trace_id, attribute='value'):
303 testbed = self._testbed_for_guid(guid)
305 return testbed.trace(guid, trace_id, attribute)
306 raise RuntimeError("No element exists with guid %d" % guid)
308 def traces_info(self):
310 for guid, testbed in self._testbeds.iteritems():
311 tinfo = testbed.traces_info()
313 traces_info[guid] = testbed.traces_info()
317 def _parallel(callables):
320 @functools.wraps(callable)
321 def wrapped(*p, **kw):
325 logging.exception("Exception occurred in asynchronous thread:")
326 excs.append(sys.exc_info())
328 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
329 for thread in threads:
331 for thread in threads:
334 eTyp, eVal, eLoc = exc
335 raise eTyp, eVal, eLoc
338 self._started_time = time.time()
341 def _start(self, recover = False):
342 parser = XmlExperimentParser()
345 xml = self._experiment_execute_xml
347 xml = self._experiment_design_xml
348 data = parser.from_xml_to_data(xml)
350 # instantiate testbed controllers
351 to_recover, to_restart = self._init_testbed_controllers(data, recover)
352 all_restart = set(to_restart)
355 # persist testbed connection data, for potential recovery
356 self._persist_testbed_proxies()
358 # recover recoverable controllers
359 for guid in to_recover:
361 self._testbeds[guid].do_setup()
362 self._testbeds[guid].recover()
364 self._logger.exception("During recovery of testbed %s", guid)
367 self._failed_testbeds.add(guid)
369 def steps_to_configure(self, allowed_guids):
370 # perform setup in parallel for all test beds,
371 # wait for all threads to finish
372 self._parallel([testbed.do_setup
373 for guid,testbed in self._testbeds.iteritems()
374 if guid in allowed_guids])
376 # perform create-connect in parallel, wait
377 # (internal connections only)
378 self._parallel([testbed.do_create
379 for guid,testbed in self._testbeds.iteritems()
380 if guid in allowed_guids])
382 self._parallel([testbed.do_connect_init
383 for guid,testbed in self._testbeds.iteritems()
384 if guid in allowed_guids])
386 self._parallel([testbed.do_connect_compl
387 for guid,testbed in self._testbeds.iteritems()
388 if guid in allowed_guids])
390 self._parallel([testbed.do_preconfigure
391 for guid,testbed in self._testbeds.iteritems()
392 if guid in allowed_guids])
395 steps_to_configure(self, to_restart)
397 if self._netreffed_testbeds:
398 # initally resolve netrefs
399 self.do_netrefs(data, fail_if_undefined=False)
401 # rinse and repeat, for netreffed testbeds
402 netreffed_testbeds = set(self._netreffed_testbeds)
404 to_recover, to_restart = self._init_testbed_controllers(data, recover)
405 all_restart.update(to_restart)
408 # persist testbed connection data, for potential recovery
409 self._persist_testbed_proxies()
411 # recover recoverable controllers
412 for guid in to_recover:
414 self._testbeds[guid].do_setup()
415 self._testbeds[guid].recover()
417 self._logger.exception("During recovery of testbed %s", guid)
420 self._failed_testbeds.add(guid)
422 # configure dependant testbeds
423 steps_to_configure(self, to_restart)
425 all_restart = [ self._testbeds[guid] for guid in all_restart ]
427 # final netref step, fail if anything's left unresolved
428 self.do_netrefs(data, fail_if_undefined=False)
430 # Only now, that netref dependencies have been solve, it is safe to
431 # program cross_connections
432 self._program_testbed_cross_connections(data)
434 # perform do_configure in parallel for al testbeds
435 # (it's internal configuration for each)
436 self._parallel([testbed.do_configure
437 for testbed in all_restart])
441 #print >>sys.stderr, "DO IT"
445 # cross-connect (cannot be done in parallel)
446 for guid, testbed in self._testbeds.iteritems():
447 cross_data = self._get_cross_data(guid)
448 testbed.do_cross_connect_init(cross_data)
449 for guid, testbed in self._testbeds.iteritems():
450 cross_data = self._get_cross_data(guid)
451 testbed.do_cross_connect_compl(cross_data)
455 # Last chance to configure (parallel on all testbeds)
456 self._parallel([testbed.do_prestart
457 for testbed in all_restart])
459 # final netref step, fail if anything's left unresolved
460 self.do_netrefs(data, fail_if_undefined=True)
465 # update execution xml with execution-specific values
466 # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
467 self._update_execute_xml()
468 self.persist_execute_xml()
470 # start experiment (parallel start on all testbeds)
471 self._parallel([testbed.start
472 for testbed in all_restart])
476 def _clear_caches(self):
477 # Cleaning cache for safety.
478 self._guids_in_testbed_cache = dict()
480 def _persist_testbed_proxies(self):
481 TRANSIENT = (DC.RECOVER,)
483 # persist access configuration for all testbeds, so that
484 # recovery mode can reconnect to them if it becomes necessary
485 conf = ConfigParser.RawConfigParser()
486 for testbed_guid, testbed_config in self._deployment_config.iteritems():
487 testbed_guid = str(testbed_guid)
488 conf.add_section(testbed_guid)
489 for attr in testbed_config.get_attribute_list():
490 if attr not in TRANSIENT:
491 conf.set(testbed_guid, attr,
492 testbed_config.get_attribute_value(attr))
494 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
498 def _load_testbed_proxies(self):
500 Attribute.STRING : 'get',
501 Attribute.BOOL : 'getboolean',
502 Attribute.ENUM : 'get',
503 Attribute.DOUBLE : 'getfloat',
504 Attribute.INTEGER : 'getint',
507 TRANSIENT = (DC.RECOVER,)
509 # deferred import because proxy needs
510 # our class definitions to define proxies
511 import nepi.util.proxy as proxy
513 conf = ConfigParser.RawConfigParser()
514 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
515 for testbed_guid in conf.sections():
516 testbed_config = proxy.AccessConfiguration()
517 testbed_guid = str(testbed_guid)
518 for attr in testbed_config.get_attribute_list():
519 if attr not in TRANSIENT:
520 getter = getattr(conf, TYPEMAP.get(
521 testbed_config.get_attribute_type(attr),
523 testbed_config.set_attribute_value(
524 attr, getter(testbed_guid, attr))
526 def _unpersist_testbed_proxies(self):
528 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
530 # Just print exceptions, this is just cleanup
531 self._logger.exception("Loading testbed configuration")
533 def _update_execute_xml(self):
535 # For all elements in testbed,
536 # - gather immutable execute-readable attribuets lists
538 # Generate new design description from design xml
539 # (Wait for attributes lists - implicit syncpoint)
541 # For all elements in testbed,
542 # - gather all immutable execute-readable attribute
543 # values, asynchronously
544 # (Wait for attribute values - implicit syncpoint)
546 # For all elements in testbed,
547 # - inject non-None values into new design
548 # Generate execute xml from new design
550 attribute_lists = dict(
551 (testbed_guid, collections.defaultdict(dict))
552 for testbed_guid in self._testbeds
555 for testbed_guid, testbed in self._testbeds.iteritems():
556 guids = self._guids_in_testbed(testbed_guid)
558 attribute_lists[testbed_guid][guid] = \
559 testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
561 parser = XmlExperimentParser()
562 execute_data = parser.from_xml_to_data(self._experiment_design_xml)
564 attribute_values = dict(
565 (testbed_guid, collections.defaultdict(dict))
566 for testbed_guid in self._testbeds
569 for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
570 testbed = self._testbeds[testbed_guid]
571 for guid, attribute_list in testbed_attribute_lists.iteritems():
572 attribute_list = _undefer(attribute_list)
573 attribute_values[testbed_guid][guid] = dict(
574 (attribute, testbed.get_deferred(guid, attribute))
575 for attribute in attribute_list
578 for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
579 for guid, attribute_values in testbed_attribute_values.iteritems():
580 for attribute, value in attribute_values.iteritems():
581 value = _undefer(value)
582 if value is not None:
583 execute_data.add_attribute_data(guid, attribute, value)
585 self._experiment_execute_xml = parser.to_xml(data=execute_data)
588 for testbed in self._testbeds.values():
590 self._unpersist_testbed_proxies()
591 self._stopped_time = time.time()
594 # reload perviously persisted testbed access configurations
595 self._failed_testbeds.clear()
596 self._load_testbed_proxies()
598 # re-program testbeds that need recovery
599 self._start(recover = True)
601 def is_finished(self, guid):
602 testbed = self._testbed_for_guid(guid)
604 return testbed.status(guid) == AS.STATUS_FINISHED
605 raise RuntimeError("No element exists with guid %d" % guid)
607 def _testbed_recovery_policy(self, guid, data = None):
609 parser = XmlExperimentParser()
610 data = parser.from_xml_to_data(self._experiment_design_xml)
612 return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
614 def status(self, guid):
615 if guid in self._testbeds:
617 # report testbed status
618 if guid in self._failed_testbeds:
619 return TS.STATUS_FAILED
622 return self._testbeds[guid].status()
624 return TS.STATUS_UNRESPONSIVE
627 testbed = self._testbed_for_guid(guid)
628 if testbed is not None:
629 return testbed.status(guid)
631 return AS.STATUS_UNDETERMINED
633 def set(self, guid, name, value, time = TIME_NOW):
634 testbed = self._testbed_for_guid(guid)
636 testbed.set(guid, name, value, time)
638 raise RuntimeError("No element exists with guid %d" % guid)
640 def get(self, guid, name, time = TIME_NOW):
641 testbed = self._testbed_for_guid(guid)
643 return testbed.get(guid, name, time)
644 raise RuntimeError("No element exists with guid %d" % guid)
646 def get_deferred(self, guid, name, time = TIME_NOW):
647 testbed = self._testbed_for_guid(guid)
649 return testbed.get_deferred(guid, name, time)
650 raise RuntimeError("No element exists with guid %d" % guid)
652 def get_factory_id(self, guid):
653 testbed = self._testbed_for_guid(guid)
655 return testbed.get_factory_id(guid)
656 raise RuntimeError("No element exists with guid %d" % guid)
658 def get_testbed_id(self, guid):
659 testbed = self._testbed_for_guid(guid)
661 return testbed.testbed_id
662 raise RuntimeError("No element exists with guid %d" % guid)
664 def get_testbed_version(self, guid):
665 testbed = self._testbed_for_guid(guid)
667 return testbed.testbed_version
668 raise RuntimeError("No element exists with guid %d" % guid)
672 for testbed in self._testbeds.values():
676 exceptions.append(sys.exc_info())
677 for exc_info in exceptions:
678 raise exc_info[0], exc_info[1], exc_info[2]
680 def _testbed_for_guid(self, guid):
681 for testbed_guid in self._testbeds.keys():
682 if guid in self._guids_in_testbed(testbed_guid):
683 if testbed_guid in self._failed_testbeds:
685 return self._testbeds[testbed_guid]
688 def _guids_in_testbed(self, testbed_guid):
689 if testbed_guid not in self._testbeds:
691 if testbed_guid not in self._guids_in_testbed_cache:
692 self._guids_in_testbed_cache[testbed_guid] = \
693 set(self._testbeds[testbed_guid].guids)
694 return self._guids_in_testbed_cache[testbed_guid]
697 def _netref_component_split(component):
698 match = COMPONENT_PATTERN.match(component)
700 return match.group("kind"), match.group("index")
702 return component, None
704 _NETREF_COMPONENT_GETTERS = {
706 lambda testbed, guid, index, name:
707 testbed.get_address(guid, int(index), name),
709 lambda testbed, guid, index, name:
710 testbed.get_route(guid, int(index), name),
712 lambda testbed, guid, index, name:
713 testbed.trace(guid, index, attribute = name),
715 lambda testbed, guid, index, name:
716 testbed.get(guid, name),
719 def resolve_netref_value(self, value, failval = None):
722 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
723 label = match.group("label")
724 if label.startswith('GUID-'):
725 ref_guid = int(label[5:])
727 expr = match.group("expr")
728 component = (match.group("component") or "")[1:] # skip the dot
729 attribute = match.group("attribute")
731 # split compound components into component kind and index
732 # eg: 'addr[0]' -> ('addr', '0')
733 component, component_index = self._netref_component_split(component)
735 # find object and resolve expression
736 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
737 if component not in self._NETREF_COMPONENT_GETTERS:
738 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
739 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
742 ref_value = self._NETREF_COMPONENT_GETTERS[component](
743 ref_testbed, ref_guid, component_index, attribute)
745 value = rv = value.replace(match.group(), ref_value)
748 # unresolvable netref
755 def do_netrefs(self, data, fail_if_undefined = False):
757 for (testbed_guid, guid), attrs in self._netrefs.items():
758 testbed = self._testbeds.get(testbed_guid)
759 if testbed is not None:
760 for name in set(attrs):
761 value = testbed.get(guid, name)
762 if isinstance(value, basestring):
763 ref_value = self.resolve_netref_value(value)
764 if ref_value is not None:
765 testbed.set(guid, name, ref_value)
767 elif fail_if_undefined:
768 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
770 del self._netrefs[(testbed_guid, guid)]
773 for testbed_guid, attrs in self._testbed_netrefs.items():
774 tb_data = dict(data.get_attribute_data(testbed_guid))
776 for name in set(attrs):
777 value = tb_data.get(name)
778 if isinstance(value, basestring):
779 ref_value = self.resolve_netref_value(value)
780 if ref_value is not None:
781 data.set_attribute_data(testbed_guid, name, ref_value)
783 elif fail_if_undefined:
784 raise ValueError, "Unresolvable netref in: %r" % (value,)
786 del self._testbed_netrefs[testbed_guid]
789 def _init_testbed_controllers(self, data, recover = False):
790 blacklist_testbeds = set(self._testbeds)
791 element_guids = list()
793 data_guids = data.guids
797 # gather label associations
798 for guid in data_guids:
799 if not data.is_testbed_data(guid):
800 (testbed_guid, factory_id) = data.get_box_data(guid)
801 label = data.get_attribute_data(guid, "label")
802 if label is not None:
803 if label in label_guids:
804 raise RuntimeError, "Label %r is not unique" % (label,)
805 label_guids[label] = guid
807 # create testbed controllers
808 for guid in data_guids:
809 if data.is_testbed_data(guid):
810 if guid not in self._testbeds:
812 self._create_testbed_controller(
813 guid, data, element_guids, recover)
816 blacklist_testbeds.add(guid)
821 policy = self._testbed_recovery_policy(guid, data=data)
822 if policy == DC.POLICY_RECOVER:
823 self._create_testbed_controller(
824 guid, data, element_guids, False)
826 elif policy == DC.POLICY_RESTART:
827 self._create_testbed_controller(
828 guid, data, element_guids, False)
832 self._failed_testbeds.add(guid)
836 # queue programmable elements
837 # - that have not been programmed already (blacklist_testbeds)
838 # - including recovered or restarted testbeds
839 # - but those that have no unresolved netrefs
840 for guid in data_guids:
841 if not data.is_testbed_data(guid):
842 (testbed_guid, factory_id) = data.get_box_data(guid)
843 if testbed_guid not in blacklist_testbeds:
844 element_guids.append(guid)
846 # replace references to elements labels for its guid
847 self._resolve_labels(data, data_guids, label_guids)
849 # program testbed controllers
851 self._program_testbed_controllers(element_guids, data)
853 return to_recover, to_restart
855 def _resolve_labels(self, data, data_guids, label_guids):
856 netrefs = self._netrefs
857 testbed_netrefs = self._testbed_netrefs
858 for guid in data_guids:
859 for name, value in data.get_attribute_data(guid):
860 if isinstance(value, basestring):
862 for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
863 label = match.group("label")
864 if not label.startswith('GUID-'):
865 ref_guid = label_guids.get(label)
866 if ref_guid is not None:
867 value = value.replace(
869 ATTRIBUTE_PATTERN_GUID_SUB % dict(
870 guid = 'GUID-%d' % (ref_guid,),
871 expr = match.group("expr"),
874 data.set_attribute_data(guid, name, value)
876 # memorize which guid-attribute pairs require
877 # postprocessing, to avoid excessive controller-testbed
878 # communication at configuration time
879 # (which could require high-latency network I/O)
880 if not data.is_testbed_data(guid):
881 (testbed_guid, factory_id) = data.get_box_data(guid)
882 netrefs[(testbed_guid, guid)].add(name)
884 testbed_netrefs[guid].add(name)
890 def _create_testbed_controller(self, guid, data, element_guids, recover):
891 (testbed_id, testbed_version) = data.get_testbed_data(guid)
892 deployment_config = self._deployment_config.get(guid)
894 # deferred import because proxy needs
895 # our class definitions to define proxies
896 import nepi.util.proxy as proxy
898 if deployment_config is None:
900 deployment_config = proxy.AccessConfiguration()
902 for (name, value) in data.get_attribute_data(guid):
903 if value is not None and deployment_config.has_attribute(name):
904 # if any deployment config attribute has a netref, we can't
905 # create this controller yet
906 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
907 # remember to re-issue this one
908 self._netreffed_testbeds.add(guid)
911 # copy deployment config attribute
912 deployment_config.set_attribute_value(name, value)
915 self._deployment_config[guid] = deployment_config
917 if deployment_config is not None:
918 # force recovery mode
919 deployment_config.set_attribute_value("recover",recover)
921 testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
923 for (name, value) in data.get_attribute_data(guid):
924 testbed.defer_configure(name, value)
925 self._testbeds[guid] = testbed
926 if guid in self._netreffed_testbeds:
927 self._netreffed_testbeds.remove(guid)
929 def _program_testbed_controllers(self, element_guids, data):
930 def resolve_create_netref(data, guid, name, value):
931 # Try to resolve create-time netrefs, if possible
932 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
934 nuvalue = self.resolve_netref_value(value)
936 # Any trouble means we're not in shape to resolve the netref yet
938 if nuvalue is not None:
939 # Only if we succeed we remove the netref deferral entry
941 data.set_attribute_data(guid, name, value)
942 if (testbed_guid, guid) in self._netrefs:
943 self._netrefs[(testbed_guid, guid)].discard(name)
946 for guid in element_guids:
947 (testbed_guid, factory_id) = data.get_box_data(guid)
948 testbed = self._testbeds.get(testbed_guid)
949 if testbed is not None:
951 testbed.defer_create(guid, factory_id)
953 for (name, value) in data.get_attribute_data(guid):
954 value = resolve_create_netref(data, guid, name, value)
955 testbed.defer_create_set(guid, name, value)
957 for guid in element_guids:
958 (testbed_guid, factory_id) = data.get_box_data(guid)
959 testbed = self._testbeds.get(testbed_guid)
960 if testbed is not None:
962 for trace_id in data.get_trace_data(guid):
963 testbed.defer_add_trace(guid, trace_id)
965 for (address, netprefix, broadcast) in data.get_address_data(guid):
967 testbed.defer_add_address(guid, address, netprefix,
970 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
971 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
972 # store connections data
973 for (connector_type_name, other_guid, other_connector_type_name) \
974 in data.get_connection_data(guid):
975 (other_testbed_guid, other_factory_id) = data.get_box_data(
977 if testbed_guid == other_testbed_guid:
978 # each testbed should take care of enforcing internal
979 # connection simmetry, so each connection is only
980 # added in one direction
981 testbed.defer_connect(guid, connector_type_name,
982 other_guid, other_connector_type_name)
984 def _program_testbed_cross_connections(self, data):
985 data_guids = data.guids
986 for guid in data_guids:
987 if not data.is_testbed_data(guid):
988 (testbed_guid, factory_id) = data.get_box_data(guid)
989 testbed = self._testbeds.get(testbed_guid)
990 if testbed is not None:
991 for (connector_type_name, cross_guid, cross_connector_type_name) \
992 in data.get_connection_data(guid):
993 (testbed_guid, factory_id) = data.get_box_data(guid)
994 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
996 if testbed_guid != cross_testbed_guid:
997 cross_testbed = self._testbeds[cross_testbed_guid]
998 cross_testbed_id = cross_testbed.testbed_id
999 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
1000 cross_testbed_guid, cross_testbed_id, cross_factory_id,
1001 cross_connector_type_name)
1002 # save cross data for later
1003 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1006 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1007 if testbed_guid not in self._cross_data:
1008 self._cross_data[testbed_guid] = dict()
1009 if cross_testbed_guid not in self._cross_data[testbed_guid]:
1010 self._cross_data[testbed_guid][cross_testbed_guid] = set()
1011 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1013 def _get_cross_data(self, testbed_guid):
1015 if not testbed_guid in self._cross_data:
1018 # fetch attribute lists in one batch
1019 attribute_lists = dict()
1020 for cross_testbed_guid, guid_list in \
1021 self._cross_data[testbed_guid].iteritems():
1022 cross_testbed = self._testbeds[cross_testbed_guid]
1023 for cross_guid in guid_list:
1024 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1025 cross_testbed.get_attribute_list_deferred(cross_guid)
1027 # fetch attribute values in another batch
1028 for cross_testbed_guid, guid_list in \
1029 self._cross_data[testbed_guid].iteritems():
1030 cross_data[cross_testbed_guid] = dict()
1031 cross_testbed = self._testbeds[cross_testbed_guid]
1032 for cross_guid in guid_list:
1033 elem_cross_data = dict(
1035 _testbed_guid = cross_testbed_guid,
1036 _testbed_id = cross_testbed.testbed_id,
1037 _testbed_version = cross_testbed.testbed_version)
1038 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1039 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1040 for attr_name in attribute_list:
1041 attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1042 elem_cross_data[attr_name] = attr_value
1044 # undefer all values - we'll have to serialize them probably later
1045 for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1046 for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1047 for attr_name, attr_value in elem_cross_data.iteritems():
1048 elem_cross_data[attr_name] = _undefer(attr_value)
1052 class ExperimentSuite(object):
1053 def __init__(self, experiment_xml, access_config, repetitions,
1054 duration, wait_guids):
1055 self._experiment_xml = experiment_xml
1056 self._access_config = access_config
1057 self._experiments = dict()
1058 self._repetitions = repetitions
1059 self._duration = duration
1060 self._wait_guids = wait_guids
1061 self._current = None
1062 self._status = TS.STATUS_ZERO
1066 self._status = TS.STATUS_STARTED
1067 self._thread = threading.Thread(target = self._run_experiment_suite)
1068 self._thread.start()
1075 def _run_experiment_suite(self):
1076 for i in xrange[0, self.repetitions]:
1078 self._run_one_experiment()
1080 def _run_one_experiment(self):
1081 access_config = proxy.AccessConfiguration()
1082 for attr in self._access_config.attributes:
1083 access_config.set_attribute_value(attr.name, attr.value)
1084 access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1085 root_dir = "%s_%d" % (
1086 access_config.get_attribute_value(DC.ROOT_DIRECTORY),
1088 access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1089 controller = proxy.create_experiment_controller(self._experiment_xml,
1091 self._experiments[self._current] = controller
1093 started_at = time.time()
1094 # wait until all specified guids have finished execution
1095 if self._wait_guids:
1096 while all(itertools.imap(controller.is_finished, self._wait_guids):
1098 # wait until the minimum experiment duration time has elapsed
1100 while (time.time() - started_at) < self._duration:
1104 controller.shutdown()