2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.core.connector import ConnectorTypeBase
6 from nepi.util import proxy, validation
7 from nepi.util.constants import STATUS_FINISHED, TIME_NOW
8 from nepi.util.parser._xml import XmlExperimentParser
17 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
18 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
19 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
21 class ConnectorType(ConnectorTypeBase):
22 def __init__(self, testbed_id, factory_id, name, max = -1, min = 0):
23 super(ConnectorType, self).__init__(testbed_id, factory_id, name, max, min)
24 # from_connections -- connections where the other connector is the "From"
25 # to_connections -- connections where the other connector is the "To"
26 # keys in the dictionary correspond to the
27 # connector_type_id for possible connections. The value is a tuple:
28 # (can_cross, connect)
29 # can_cross: indicates if the connection is allowed accros different
31 # code: is the connection function to be invoked when the elements
33 self._from_connections = dict()
34 self._to_connections = dict()
36 def add_from_connection(self, testbed_id, factory_id, name, can_cross,
37 init_code, compl_code):
38 type_id = self.make_connector_type_id(testbed_id, factory_id, name)
39 self._from_connections[type_id] = (can_cross, init_code, compl_code)
41 def add_to_connection(self, testbed_id, factory_id, name, can_cross,
42 init_code, compl_code):
43 type_id = self.make_connector_type_id(testbed_id, factory_id, name)
44 self._to_connections[type_id] = (can_cross, init_code, compl_code)
46 def can_connect(self, testbed_id, factory_id, name, count,
48 connector_type_id = self.make_connector_type_id(testbed_id, factory_id, name)
49 for lookup_type_id in self._type_resolution_order(connector_type_id):
50 if lookup_type_id in self._from_connections:
51 (can_cross, init_code, compl_code) = self._from_connections[lookup_type_id]
52 elif lookup_type_id in self._to_connections:
53 (can_cross, init_code, compl_code) = self._to_connections[lookup_type_id]
57 return not must_cross or can_cross
61 def _connect_to_code(self, testbed_id, factory_id, name):
62 connector_type_id = self.make_connector_type_id(testbed_id, factory_id, name)
63 for lookup_type_id in self._type_resolution_order(connector_type_id):
64 if lookup_type_id in self._to_connections:
65 (can_cross, init_code, compl_code) = self._to_connections[lookup_type_id]
66 return (init_code, compl_code)
70 def connect_to_init_code(self, testbed_id, factory_id, name):
71 return self._connect_to_code(testbed_id, factory_id, name)[0]
73 def connect_to_compl_code(self, testbed_id, factory_id, name):
74 return self._connect_to_code(testbed_id, factory_id, name)[1]
76 class Factory(AttributesMap):
77 def __init__(self, factory_id, create_function, start_function,
78 stop_function, status_function,
79 configure_function, preconfigure_function,
80 allow_addresses = False, has_addresses = False,
81 allow_routes = False, has_routes = False):
82 super(Factory, self).__init__()
83 self._factory_id = factory_id
84 self._allow_addresses = bool(allow_addresses)
85 self._allow_routes = bool(allow_routes)
86 self._has_addresses = bool(has_addresses) or self._allow_addresses
87 self._has_routes = bool(has_routes) or self._allow_routes
88 self._create_function = create_function
89 self._start_function = start_function
90 self._stop_function = stop_function
91 self._status_function = status_function
92 self._configure_function = configure_function
93 self._preconfigure_function = preconfigure_function
94 self._connector_types = dict()
96 self._box_attributes = AttributesMap()
100 return self._factory_id
103 def allow_addresses(self):
104 return self._allow_addresses
107 def allow_routes(self):
108 return self._allow_routes
111 def has_addresses(self):
112 return self._has_addresses
115 def has_routes(self):
116 return self._has_routes
119 def box_attributes(self):
120 return self._box_attributes
123 def create_function(self):
124 return self._create_function
127 def start_function(self):
128 return self._start_function
131 def stop_function(self):
132 return self._stop_function
135 def status_function(self):
136 return self._status_function
139 def configure_function(self):
140 return self._configure_function
143 def preconfigure_function(self):
144 return self._preconfigure_function
150 def connector_type(self, name):
151 return self._connector_types[name]
153 def add_connector_type(self, connector_type):
154 self._connector_types[connector_type.name] = connector_type
156 def add_trace(self, trace_id):
157 self._traces.append(trace_id)
159 def add_box_attribute(self, name, help, type, value = None, range = None,
160 allowed = None, flags = Attribute.NoFlags, validation_function = None):
161 self._box_attributes.add_attribute(name, help, type, value, range,
162 allowed, flags, validation_function)
164 class TestbedController(object):
165 def __init__(self, testbed_id, testbed_version):
166 self._testbed_id = testbed_id
167 self._testbed_version = testbed_version
170 def testbed_id(self):
171 return self._testbed_id
174 def testbed_version(self):
175 return self._testbed_version
179 raise NotImplementedError
181 def defer_configure(self, name, value):
182 """Instructs setting a configuartion attribute for the testbed instance"""
183 raise NotImplementedError
185 def defer_create(self, guid, factory_id):
186 """Instructs creation of element """
187 raise NotImplementedError
189 def defer_create_set(self, guid, name, value):
190 """Instructs setting an initial attribute on an element"""
191 raise NotImplementedError
193 def defer_factory_set(self, guid, name, value):
194 """Instructs setting an attribute on a factory"""
195 raise NotImplementedError
197 def defer_connect(self, guid1, connector_type_name1, guid2,
198 connector_type_name2):
199 """Instructs creation of a connection between the given connectors"""
200 raise NotImplementedError
202 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
203 cross_testbed_id, cross_factory_id, cross_connector_type_name):
205 Instructs creation of a connection between the given connectors
206 of different testbed instances
208 raise NotImplementedError
210 def defer_add_trace(self, guid, trace_id):
211 """Instructs the addition of a trace"""
212 raise NotImplementedError
214 def defer_add_address(self, guid, address, netprefix, broadcast):
215 """Instructs the addition of an address"""
216 raise NotImplementedError
218 def defer_add_route(self, guid, destination, netprefix, nexthop):
219 """Instructs the addition of a route"""
220 raise NotImplementedError
223 """After do_setup the testbed initial configuration is done"""
224 raise NotImplementedError
228 After do_create all instructed elements are created and
231 raise NotImplementedError
233 def do_connect_init(self):
235 After do_connect_init all internal connections between testbed elements
238 raise NotImplementedError
240 def do_connect_compl(self):
242 After do_connect all internal connections between testbed elements
245 raise NotImplementedError
247 def do_configure(self):
248 """After do_configure elements are configured"""
249 raise NotImplementedError
251 def do_cross_connect_init(self, cross_data):
253 After do_cross_connect_init initiation of all external connections
254 between different testbed elements is performed
256 raise NotImplementedError
258 def do_cross_connect_compl(self, cross_data):
260 After do_cross_connect_compl completion of all external connections
261 between different testbed elements is performed
263 raise NotImplementedError
266 raise NotImplementedError
269 raise NotImplementedError
271 def set(self, guid, name, value, time = TIME_NOW):
272 raise NotImplementedError
274 def get(self, guid, name, time = TIME_NOW):
275 raise NotImplementedError
277 def get_route(self, guid, index, attribute):
281 guid: guid of box to query
282 index: number of routing entry to fetch
283 attribute: one of Destination, NextHop, NetPrefix
285 raise NotImplementedError
287 def get_address(self, guid, index, attribute='Address'):
291 guid: guid of box to query
292 index: number of inteface to select
293 attribute: one of Address, NetPrefix, Broadcast
295 raise NotImplementedError
297 def get_attribute_list(self, guid):
298 raise NotImplementedError
300 def action(self, time, guid, action):
301 raise NotImplementedError
303 def status(self, guid):
304 raise NotImplementedError
306 def trace(self, guid, trace_id, attribute='value'):
307 raise NotImplementedError
310 raise NotImplementedError
312 class ExperimentController(object):
313 def __init__(self, experiment_xml, root_dir):
314 self._experiment_xml = experiment_xml
315 self._testbeds = dict()
316 self._deployment_config = dict()
317 self._netrefs = collections.defaultdict(set)
318 self._testbed_netrefs = collections.defaultdict(set)
319 self._cross_data = dict()
320 self._root_dir = root_dir
321 self._netreffed_testbeds = set()
323 self.persist_experiment_xml()
326 def experiment_xml(self):
327 return self._experiment_xml
329 def persist_experiment_xml(self):
330 xml_path = os.path.join(self._root_dir, "experiment.xml")
331 f = open(xml_path, "w")
332 f.write(self._experiment_xml)
335 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
336 return self._testbeds[testbed_guid].trace(guid, trace_id, attribute)
339 def _parallel(callables):
342 @functools.wraps(callable)
343 def wrapped(*p, **kw):
348 traceback.print_exc(file=sys.stderr)
351 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
352 for thread in threads:
354 for thread in threads:
360 parser = XmlExperimentParser()
361 data = parser.from_xml_to_data(self._experiment_xml)
363 self._init_testbed_controllers(data)
365 # persist testbed connection data, for potential recovery
366 self._persist_testbed_proxies()
368 def steps_to_configure(self, allowed_guids):
369 # perform setup in parallel for all test beds,
370 # wait for all threads to finish
371 self._parallel([testbed.do_setup
372 for guid,testbed in self._testbeds.iteritems()
373 if guid in allowed_guids])
375 # perform create-connect in parallel, wait
376 # (internal connections only)
377 self._parallel([testbed.do_create
378 for guid,testbed in self._testbeds.iteritems()
379 if guid in allowed_guids])
381 self._parallel([testbed.do_connect_init
382 for guid,testbed in self._testbeds.iteritems()
383 if guid in allowed_guids])
385 self._parallel([testbed.do_connect_compl
386 for guid,testbed in self._testbeds.iteritems()
387 if guid in allowed_guids])
389 self._parallel([testbed.do_preconfigure
390 for guid,testbed in self._testbeds.iteritems()
391 if guid in allowed_guids])
393 steps_to_configure(self, self._testbeds)
395 if self._netreffed_testbeds:
396 # initally resolve netrefs
397 self.do_netrefs(data, fail_if_undefined=False)
399 # rinse and repeat, for netreffed testbeds
400 netreffed_testbeds = set(self._netreffed_testbeds)
402 self._init_testbed_controllers(data)
404 # persist testbed connection data, for potential recovery
405 self._persist_testbed_proxies()
407 # configure dependant testbeds
408 steps_to_configure(self, netreffed_testbeds)
410 # final netref step, fail if anything's left unresolved
411 self.do_netrefs(data, fail_if_undefined=True)
413 self._program_testbed_cross_connections(data)
415 # perform do_configure in parallel for al testbeds
416 # (it's internal configuration for each)
417 self._parallel([testbed.do_configure
418 for testbed in self._testbeds.itervalues()])
420 # cross-connect (cannot be done in parallel)
421 for guid, testbed in self._testbeds.iteritems():
422 cross_data = self._get_cross_data(guid)
423 testbed.do_cross_connect_init(cross_data)
424 for guid, testbed in self._testbeds.iteritems():
425 cross_data = self._get_cross_data(guid)
426 testbed.do_cross_connect_compl(cross_data)
428 # start experiment (parallel start on all testbeds)
429 self._parallel([testbed.start
430 for testbed in self._testbeds.itervalues()])
432 def _persist_testbed_proxies(self):
433 TRANSIENT = ('Recover',)
435 # persist access configuration for all testbeds, so that
436 # recovery mode can reconnect to them if it becomes necessary
437 conf = ConfigParser.RawConfigParser()
438 for testbed_guid, testbed_config in self._deployment_config.iteritems():
439 testbed_guid = str(testbed_guid)
440 conf.add_section(testbed_guid)
441 for attr in testbed_config.attributes_list:
442 if attr not in TRANSIENT:
443 conf.set(testbed_guid, attr,
444 testbed_config.get_attribute_value(attr))
446 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
450 def _load_testbed_proxies(self):
455 BOOLEAN : 'getboolean',
458 conf = ConfigParser.RawConfigParser()
459 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
460 for testbed_guid in conf.sections():
461 testbed_config = proxy.AccessConfiguration()
462 for attr in conf.options(testbed_guid):
463 testbed_config.set_attribute_value(attr,
464 conf.get(testbed_guid, attr) )
466 testbed_guid = str(testbed_guid)
467 conf.add_section(testbed_guid)
468 for attr in testbed_config.attributes_list:
469 if attr not in TRANSIENT:
470 getter = getattr(conf, TYPEMAP.get(
471 testbed_config.get_attribute_type(attr),
473 testbed_config.set_attribute_value(
474 testbed_guid, attr, getter(attr))
476 def _unpersist_testbed_proxies(self):
478 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
480 # Just print exceptions, this is just cleanup
482 traceback.print_exc(file=sys.stderr)
485 for testbed in self._testbeds.values():
487 self._unpersist_testbed_proxies()
490 # reload perviously persisted testbed access configurations
491 self._load_testbed_proxies()
493 # recreate testbed proxies by reconnecting only
494 self._init_testbed_controllers(recover = True)
496 # another time, for netrefs
497 self._init_testbed_controllers(recover = True)
499 def is_finished(self, guid):
500 for testbed in self._testbeds.values():
501 for guid_ in testbed.guids:
503 return testbed.status(guid) == STATUS_FINISHED
504 raise RuntimeError("No element exists with guid %d" % guid)
506 def set(self, testbed_guid, guid, name, value, time = TIME_NOW):
507 testbed = self._testbeds[testbed_guid]
508 testbed.set(guid, name, value, time)
510 def get(self, testbed_guid, guid, name, time = TIME_NOW):
511 testbed = self._testbeds[testbed_guid]
512 return testbed.get(guid, name, time)
515 for testbed in self._testbeds.values():
519 def _netref_component_split(component):
520 match = COMPONENT_PATTERN.match(component)
522 return match.group("kind"), match.group("index")
524 return component, None
526 _NETREF_COMPONENT_GETTERS = {
528 lambda testbed, guid, index, name:
529 testbed.get_address(guid, index, name),
531 lambda testbed, guid, index, name:
532 testbed.get_route(guid, index, name),
534 lambda testbed, guid, index, name:
535 testbed.trace(guid, index, name),
537 lambda testbed, guid, index, name:
538 testbed.get(guid, name),
541 def resolve_netref_value(self, value):
542 match = ATTRIBUTE_PATTERN_BASE.search(value)
544 label = match.group("label")
545 if label.startswith('GUID-'):
546 ref_guid = int(label[5:])
548 expr = match.group("expr")
549 component = (match.group("component") or "")[1:] # skip the dot
550 attribute = match.group("attribute")
552 # split compound components into component kind and index
553 # eg: 'addr[0]' -> ('addr', '0')
554 component, component_index = self._netref_component_split(component)
556 # find object and resolve expression
557 for ref_testbed in self._testbeds.itervalues():
558 if component not in self._NETREF_COMPONENT_GETTERS:
559 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
561 ref_value = self._NETREF_COMPONENT_GETTERS[component](
562 ref_testbed, ref_guid, component_index, attribute)
564 return value.replace(match.group(), ref_value)
565 # couldn't find value
568 def do_netrefs(self, data, fail_if_undefined = False):
570 for (testbed_guid, guid), attrs in self._netrefs.iteritems():
571 testbed = self._testbeds[testbed_guid]
573 value = testbed.get(guid, name)
574 if isinstance(value, basestring):
575 ref_value = self.resolve_netref_value(value)
576 if ref_value is not None:
577 testbed.set(guid, name, ref_value)
578 elif fail_if_undefined:
579 raise ValueError, "Unresolvable netref in: %r" % (value,)
582 for testbed_guid, attrs in self._testbed_netrefs.iteritems():
583 tb_data = dict(data.get_attribute_data(testbed_guid))
586 value = tb_data.get(name)
587 if isinstance(value, basestring):
588 ref_value = self.resolve_netref_value(value)
589 if ref_value is not None:
590 data.set_attribute_data(testbed_guid, name, ref_value)
591 elif fail_if_undefined:
592 raise ValueError, "Unresolvable netref in: %r" % (value,)
594 self._netrefs.clear()
595 self._testbed_netrefs.clear()
597 def _init_testbed_controllers(self, data, recover = False):
598 blacklist_testbeds = set(self._testbeds)
599 element_guids = list()
601 data_guids = data.guids
603 # create testbed controllers
604 for guid in data_guids:
605 if data.is_testbed_data(guid):
606 if guid not in self._testbeds:
607 self._create_testbed_controller(guid, data, element_guids,
610 (testbed_guid, factory_id) = data.get_box_data(guid)
611 if testbed_guid not in blacklist_testbeds:
612 element_guids.append(guid)
613 label = data.get_attribute_data(guid, "label")
614 if label is not None:
615 if label in label_guids:
616 raise RuntimeError, "Label %r is not unique" % (label,)
617 label_guids[label] = guid
619 # replace references to elements labels for its guid
620 self._resolve_labels(data, data_guids, label_guids)
622 # program testbed controllers
624 self._program_testbed_controllers(element_guids, data)
626 def _resolve_labels(self, data, data_guids, label_guids):
627 netrefs = self._netrefs
628 testbed_netrefs = self._testbed_netrefs
629 for guid in data_guids:
630 for name, value in data.get_attribute_data(guid):
631 if isinstance(value, basestring):
632 match = ATTRIBUTE_PATTERN_BASE.search(value)
634 label = match.group("label")
635 if not label.startswith('GUID-'):
636 ref_guid = label_guids.get(label)
637 if ref_guid is not None:
638 value = ATTRIBUTE_PATTERN_BASE.sub(
639 ATTRIBUTE_PATTERN_GUID_SUB % dict(
640 guid = 'GUID-%d' % (ref_guid,),
641 expr = match.group("expr"),
644 data.set_attribute_data(guid, name, value)
646 # memorize which guid-attribute pairs require
647 # postprocessing, to avoid excessive controller-testbed
648 # communication at configuration time
649 # (which could require high-latency network I/O)
650 if not data.is_testbed_data(guid):
651 (testbed_guid, factory_id) = data.get_box_data(guid)
652 netrefs[(testbed_guid, guid)].add(name)
654 testbed_netrefs[guid].add(name)
656 def _create_testbed_controller(self, guid, data, element_guids, recover):
657 (testbed_id, testbed_version) = data.get_testbed_data(guid)
658 deployment_config = self._deployment_config.get(guid)
660 if deployment_config is None:
662 deployment_config = proxy.AccessConfiguration()
664 for (name, value) in data.get_attribute_data(guid):
665 if value is not None and deployment_config.has_attribute(name):
666 # if any deployment config attribute has a netref, we can't
667 # create this controller yet
668 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
669 # remember to re-issue this one
670 self._netreffed_testbeds.add(guid)
673 # copy deployment config attribute
674 deployment_config.set_attribute_value(name, value)
677 self._deployment_config[guid] = deployment_config
679 if deployment_config is not None:
680 # force recovery mode
681 deployment_config.set_attribute_value("recover",recover)
683 testbed = proxy.create_testbed_controller(testbed_id,
684 testbed_version, deployment_config)
685 for (name, value) in data.get_attribute_data(guid):
686 testbed.defer_configure(name, value)
687 self._testbeds[guid] = testbed
688 if guid in self._netreffed_testbeds:
689 self._netreffed_testbeds.remove(guid)
691 def _program_testbed_controllers(self, element_guids, data):
692 for guid in element_guids:
693 (testbed_guid, factory_id) = data.get_box_data(guid)
694 testbed = self._testbeds.get(testbed_guid)
696 testbed.defer_create(guid, factory_id)
697 for (name, value) in data.get_attribute_data(guid):
698 testbed.defer_create_set(guid, name, value)
700 for guid in element_guids:
701 (testbed_guid, factory_id) = data.get_box_data(guid)
702 testbed = self._testbeds.get(testbed_guid)
704 for (connector_type_name, cross_guid, cross_connector_type_name) \
705 in data.get_connection_data(guid):
706 (testbed_guid, factory_id) = data.get_box_data(guid)
707 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
709 if testbed_guid == cross_testbed_guid:
710 testbed.defer_connect(guid, connector_type_name,
711 cross_guid, cross_connector_type_name)
712 for trace_id in data.get_trace_data(guid):
713 testbed.defer_add_trace(guid, trace_id)
714 for (autoconf, address, netprefix, broadcast) in \
715 data.get_address_data(guid):
717 testbed.defer_add_address(guid, address, netprefix,
719 for (destination, netprefix, nexthop) in data.get_route_data(guid):
720 testbed.defer_add_route(guid, destination, netprefix, nexthop)
722 def _program_testbed_cross_connections(self, data):
723 data_guids = data.guids
725 for guid in data_guids:
726 if not data.is_testbed_data(guid):
727 (testbed_guid, factory_id) = data.get_box_data(guid)
728 testbed = self._testbeds.get(testbed_guid)
730 for (connector_type_name, cross_guid, cross_connector_type_name) \
731 in data.get_connection_data(guid):
732 (testbed_guid, factory_id) = data.get_box_data(guid)
733 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
735 if testbed_guid != cross_testbed_guid:
736 cross_testbed = self._testbeds[cross_testbed_guid]
737 cross_testbed_id = cross_testbed.testbed_id
738 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
739 cross_testbed_guid, cross_testbed_id, cross_factory_id,
740 cross_connector_type_name)
741 # save cross data for later
742 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
745 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
746 if testbed_guid not in self._cross_data:
747 self._cross_data[testbed_guid] = dict()
748 if cross_testbed_guid not in self._cross_data[testbed_guid]:
749 self._cross_data[testbed_guid][cross_testbed_guid] = set()
750 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
752 def _get_cross_data(self, testbed_guid):
754 if not testbed_guid in self._cross_data:
756 for cross_testbed_guid, guid_list in \
757 self._cross_data[testbed_guid].iteritems():
758 cross_data[cross_testbed_guid] = dict()
759 cross_testbed = self._testbeds[cross_testbed_guid]
760 for cross_guid in guid_list:
761 elem_cross_data = dict(
763 _testbed_guid = cross_testbed_guid,
764 _testbed_id = cross_testbed.testbed_id,
765 _testbed_version = cross_testbed.testbed_version)
766 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
767 attributes_list = cross_testbed.get_attribute_list(cross_guid)
768 for attr_name in attributes_list:
769 attr_value = cross_testbed.get(cross_guid, attr_name)
770 elem_cross_data[attr_name] = attr_value