2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.core.connector import ConnectorTypeBase
6 from nepi.util import proxy, validation
7 from nepi.util.constants import STATUS_FINISHED, TIME_NOW
8 from nepi.util.parser._xml import XmlExperimentParser
17 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
18 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
19 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
21 class ConnectorType(ConnectorTypeBase):
22 def __init__(self, testbed_id, factory_id, name, max = -1, min = 0):
23 super(ConnectorType, self).__init__(testbed_id, factory_id, name, max, min)
24 # from_connections -- connections where the other connector is the "From"
25 # to_connections -- connections where the other connector is the "To"
26 # keys in the dictionary correspond to the
27 # connector_type_id for possible connections. The value is a tuple:
28 # (can_cross, connect)
29 # can_cross: indicates if the connection is allowed accros different
31 # code: is the connection function to be invoked when the elements
33 self._from_connections = dict()
34 self._to_connections = dict()
36 def add_from_connection(self, testbed_id, factory_id, name, can_cross,
37 init_code, compl_code):
38 type_id = self.make_connector_type_id(testbed_id, factory_id, name)
39 self._from_connections[type_id] = (can_cross, init_code, compl_code)
41 def add_to_connection(self, testbed_id, factory_id, name, can_cross,
42 init_code, compl_code):
43 type_id = self.make_connector_type_id(testbed_id, factory_id, name)
44 self._to_connections[type_id] = (can_cross, init_code, compl_code)
46 def can_connect(self, testbed_id, factory_id, name, count,
48 connector_type_id = self.make_connector_type_id(testbed_id, factory_id, name)
49 for lookup_type_id in self._type_resolution_order(connector_type_id):
50 if lookup_type_id in self._from_connections:
51 (can_cross, init_code, compl_code) = self._from_connections[lookup_type_id]
52 elif lookup_type_id in self._to_connections:
53 (can_cross, init_code, compl_code) = self._to_connections[lookup_type_id]
57 return not must_cross or can_cross
61 def _connect_to_code(self, testbed_id, factory_id, name):
62 connector_type_id = self.make_connector_type_id(testbed_id, factory_id, name)
63 for lookup_type_id in self._type_resolution_order(connector_type_id):
64 if lookup_type_id in self._to_connections:
65 (can_cross, init_code, compl_code) = self._to_connections[lookup_type_id]
66 return (init_code, compl_code)
70 def connect_to_init_code(self, testbed_id, factory_id, name):
71 return self._connect_to_code(testbed_id, factory_id, name)[0]
73 def connect_to_compl_code(self, testbed_id, factory_id, name):
74 return self._connect_to_code(testbed_id, factory_id, name)[1]
76 class Factory(AttributesMap):
77 def __init__(self, factory_id, create_function, start_function,
78 stop_function, status_function,
79 configure_function, preconfigure_function,
80 allow_addresses = False, has_addresses = False,
81 allow_routes = False, has_routes = False):
82 super(Factory, self).__init__()
83 self._factory_id = factory_id
84 self._allow_addresses = bool(allow_addresses)
85 self._allow_routes = bool(allow_routes)
86 self._has_addresses = bool(has_addresses) or self._allow_addresses
87 self._has_routes = bool(has_routes) or self._allow_routes
88 self._create_function = create_function
89 self._start_function = start_function
90 self._stop_function = stop_function
91 self._status_function = status_function
92 self._configure_function = configure_function
93 self._preconfigure_function = preconfigure_function
94 self._connector_types = dict()
96 self._box_attributes = AttributesMap()
100 return self._factory_id
103 def allow_addresses(self):
104 return self._allow_addresses
107 def allow_routes(self):
108 return self._allow_routes
111 def has_addresses(self):
112 return self._has_addresses
115 def has_routes(self):
116 return self._has_routes
119 def box_attributes(self):
120 return self._box_attributes
123 def create_function(self):
124 return self._create_function
127 def start_function(self):
128 return self._start_function
131 def stop_function(self):
132 return self._stop_function
135 def status_function(self):
136 return self._status_function
139 def configure_function(self):
140 return self._configure_function
143 def preconfigure_function(self):
144 return self._preconfigure_function
150 def connector_type(self, name):
151 return self._connector_types[name]
153 def add_connector_type(self, connector_type):
154 self._connector_types[connector_type.name] = connector_type
156 def add_trace(self, trace_id):
157 self._traces.append(trace_id)
159 def add_box_attribute(self, name, help, type, value = None, range = None,
160 allowed = None, flags = Attribute.NoFlags, validation_function = None):
161 self._box_attributes.add_attribute(name, help, type, value, range,
162 allowed, flags, validation_function)
164 class TestbedController(object):
165 def __init__(self, testbed_id, testbed_version):
166 self._testbed_id = testbed_id
167 self._testbed_version = testbed_version
170 def testbed_id(self):
171 return self._testbed_id
174 def testbed_version(self):
175 return self._testbed_version
179 raise NotImplementedError
181 def defer_configure(self, name, value):
182 """Instructs setting a configuartion attribute for the testbed instance"""
183 raise NotImplementedError
185 def defer_create(self, guid, factory_id):
186 """Instructs creation of element """
187 raise NotImplementedError
189 def defer_create_set(self, guid, name, value):
190 """Instructs setting an initial attribute on an element"""
191 raise NotImplementedError
193 def defer_factory_set(self, guid, name, value):
194 """Instructs setting an attribute on a factory"""
195 raise NotImplementedError
197 def defer_connect(self, guid1, connector_type_name1, guid2,
198 connector_type_name2):
199 """Instructs creation of a connection between the given connectors"""
200 raise NotImplementedError
202 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
203 cross_testbed_id, cross_factory_id, cross_connector_type_name):
205 Instructs creation of a connection between the given connectors
206 of different testbed instances
208 raise NotImplementedError
210 def defer_add_trace(self, guid, trace_id):
211 """Instructs the addition of a trace"""
212 raise NotImplementedError
214 def defer_add_address(self, guid, address, netprefix, broadcast):
215 """Instructs the addition of an address"""
216 raise NotImplementedError
218 def defer_add_route(self, guid, destination, netprefix, nexthop):
219 """Instructs the addition of a route"""
220 raise NotImplementedError
223 """After do_setup the testbed initial configuration is done"""
224 raise NotImplementedError
228 After do_create all instructed elements are created and
231 raise NotImplementedError
233 def do_connect_init(self):
235 After do_connect_init all internal connections between testbed elements
238 raise NotImplementedError
240 def do_connect_compl(self):
242 After do_connect all internal connections between testbed elements
245 raise NotImplementedError
247 def do_configure(self):
248 """After do_configure elements are configured"""
249 raise NotImplementedError
251 def do_cross_connect_init(self, cross_data):
253 After do_cross_connect_init initiation of all external connections
254 between different testbed elements is performed
256 raise NotImplementedError
258 def do_cross_connect_compl(self, cross_data):
260 After do_cross_connect_compl completion of all external connections
261 between different testbed elements is performed
263 raise NotImplementedError
266 raise NotImplementedError
269 raise NotImplementedError
271 def set(self, guid, name, value, time = TIME_NOW):
272 raise NotImplementedError
274 def get(self, guid, name, time = TIME_NOW):
275 raise NotImplementedError
277 def get_route(self, guid, index, attribute):
281 guid: guid of box to query
282 index: number of routing entry to fetch
283 attribute: one of Destination, NextHop, NetPrefix
285 raise NotImplementedError
287 def get_address(self, guid, index, attribute='Address'):
291 guid: guid of box to query
292 index: number of inteface to select
293 attribute: one of Address, NetPrefix, Broadcast
295 raise NotImplementedError
297 def get_attribute_list(self, guid):
298 raise NotImplementedError
300 def action(self, time, guid, action):
301 raise NotImplementedError
303 def status(self, guid):
304 raise NotImplementedError
306 def trace(self, guid, trace_id, attribute='value'):
307 raise NotImplementedError
310 raise NotImplementedError
312 class ExperimentController(object):
313 def __init__(self, experiment_xml, root_dir):
314 self._experiment_xml = experiment_xml
315 self._testbeds = dict()
316 self._deployment_config = dict()
317 self._netrefs = collections.defaultdict(set)
318 self._testbed_netrefs = collections.defaultdict(set)
319 self._cross_data = dict()
320 self._root_dir = root_dir
321 self._netreffed_testbeds = set()
322 self._guids_in_testbed_cache = dict()
324 self.persist_experiment_xml()
327 def experiment_xml(self):
328 return self._experiment_xml
330 def persist_experiment_xml(self):
331 xml_path = os.path.join(self._root_dir, "experiment.xml")
332 f = open(xml_path, "w")
333 f.write(self._experiment_xml)
336 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
337 return self._testbeds[testbed_guid].trace(guid, trace_id, attribute)
340 def _parallel(callables):
343 @functools.wraps(callable)
344 def wrapped(*p, **kw):
349 traceback.print_exc(file=sys.stderr)
352 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
353 for thread in threads:
355 for thread in threads:
361 parser = XmlExperimentParser()
362 data = parser.from_xml_to_data(self._experiment_xml)
364 self._init_testbed_controllers(data)
366 # persist testbed connection data, for potential recovery
367 self._persist_testbed_proxies()
369 def steps_to_configure(self, allowed_guids):
370 # perform setup in parallel for all test beds,
371 # wait for all threads to finish
372 self._parallel([testbed.do_setup
373 for guid,testbed in self._testbeds.iteritems()
374 if guid in allowed_guids])
376 # perform create-connect in parallel, wait
377 # (internal connections only)
378 self._parallel([testbed.do_create
379 for guid,testbed in self._testbeds.iteritems()
380 if guid in allowed_guids])
382 self._parallel([testbed.do_connect_init
383 for guid,testbed in self._testbeds.iteritems()
384 if guid in allowed_guids])
386 self._parallel([testbed.do_connect_compl
387 for guid,testbed in self._testbeds.iteritems()
388 if guid in allowed_guids])
390 self._parallel([testbed.do_preconfigure
391 for guid,testbed in self._testbeds.iteritems()
392 if guid in allowed_guids])
394 steps_to_configure(self, self._testbeds)
396 if self._netreffed_testbeds:
397 # initally resolve netrefs
398 self.do_netrefs(data, fail_if_undefined=False)
400 # rinse and repeat, for netreffed testbeds
401 netreffed_testbeds = set(self._netreffed_testbeds)
403 self._init_testbed_controllers(data)
405 # persist testbed connection data, for potential recovery
406 self._persist_testbed_proxies()
408 # configure dependant testbeds
409 steps_to_configure(self, netreffed_testbeds)
411 # final netref step, fail if anything's left unresolved
412 self.do_netrefs(data, fail_if_undefined=True)
414 self._program_testbed_cross_connections(data)
416 # perform do_configure in parallel for al testbeds
417 # (it's internal configuration for each)
418 self._parallel([testbed.do_configure
419 for testbed in self._testbeds.itervalues()])
422 #print >>sys.stderr, "DO IT"
426 # cross-connect (cannot be done in parallel)
427 for guid, testbed in self._testbeds.iteritems():
428 cross_data = self._get_cross_data(guid)
429 testbed.do_cross_connect_init(cross_data)
430 for guid, testbed in self._testbeds.iteritems():
431 cross_data = self._get_cross_data(guid)
432 testbed.do_cross_connect_compl(cross_data)
434 # start experiment (parallel start on all testbeds)
435 self._parallel([testbed.start
436 for testbed in self._testbeds.itervalues()])
438 def _persist_testbed_proxies(self):
439 TRANSIENT = ('Recover',)
441 # persist access configuration for all testbeds, so that
442 # recovery mode can reconnect to them if it becomes necessary
443 conf = ConfigParser.RawConfigParser()
444 for testbed_guid, testbed_config in self._deployment_config.iteritems():
445 testbed_guid = str(testbed_guid)
446 conf.add_section(testbed_guid)
447 for attr in testbed_config.attributes_list:
448 if attr not in TRANSIENT:
449 conf.set(testbed_guid, attr,
450 testbed_config.get_attribute_value(attr))
452 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
456 def _load_testbed_proxies(self):
461 BOOLEAN : 'getboolean',
464 conf = ConfigParser.RawConfigParser()
465 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
466 for testbed_guid in conf.sections():
467 testbed_config = proxy.AccessConfiguration()
468 for attr in conf.options(testbed_guid):
469 testbed_config.set_attribute_value(attr,
470 conf.get(testbed_guid, attr) )
472 testbed_guid = str(testbed_guid)
473 conf.add_section(testbed_guid)
474 for attr in testbed_config.attributes_list:
475 if attr not in TRANSIENT:
476 getter = getattr(conf, TYPEMAP.get(
477 testbed_config.get_attribute_type(attr),
479 testbed_config.set_attribute_value(
480 testbed_guid, attr, getter(attr))
482 def _unpersist_testbed_proxies(self):
484 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
486 # Just print exceptions, this is just cleanup
488 traceback.print_exc(file=sys.stderr)
491 for testbed in self._testbeds.values():
493 self._unpersist_testbed_proxies()
496 # reload perviously persisted testbed access configurations
497 self._load_testbed_proxies()
499 # recreate testbed proxies by reconnecting only
500 self._init_testbed_controllers(recover = True)
502 # another time, for netrefs
503 self._init_testbed_controllers(recover = True)
505 def is_finished(self, guid):
506 for testbed in self._testbeds.values():
507 for guid_ in testbed.guids:
509 return testbed.status(guid) == STATUS_FINISHED
510 raise RuntimeError("No element exists with guid %d" % guid)
512 def set(self, testbed_guid, guid, name, value, time = TIME_NOW):
513 testbed = self._testbeds[testbed_guid]
514 testbed.set(guid, name, value, time)
516 def get(self, testbed_guid, guid, name, time = TIME_NOW):
517 testbed = self._testbeds[testbed_guid]
518 return testbed.get(guid, name, time)
521 for testbed in self._testbeds.values():
524 def _guids_in_testbed(self, testbed_guid):
525 if testbed_guid not in self._testbeds:
527 if testbed_guid not in self._guids_in_testbed_cache:
528 self._guids_in_testbed_cache[testbed_guid] = \
529 set(self._testbeds[testbed_guid].guids)
530 return self._guids_in_testbed_cache[testbed_guid]
533 def _netref_component_split(component):
534 match = COMPONENT_PATTERN.match(component)
536 return match.group("kind"), match.group("index")
538 return component, None
540 _NETREF_COMPONENT_GETTERS = {
542 lambda testbed, guid, index, name:
543 testbed.get_address(guid, int(index), name),
545 lambda testbed, guid, index, name:
546 testbed.get_route(guid, int(index), name),
548 lambda testbed, guid, index, name:
549 testbed.trace(guid, index, name),
551 lambda testbed, guid, index, name:
552 testbed.get(guid, name),
555 def resolve_netref_value(self, value):
556 match = ATTRIBUTE_PATTERN_BASE.search(value)
558 label = match.group("label")
559 if label.startswith('GUID-'):
560 ref_guid = int(label[5:])
562 expr = match.group("expr")
563 component = (match.group("component") or "")[1:] # skip the dot
564 attribute = match.group("attribute")
566 # split compound components into component kind and index
567 # eg: 'addr[0]' -> ('addr', '0')
568 component, component_index = self._netref_component_split(component)
570 # find object and resolve expression
571 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
572 if component not in self._NETREF_COMPONENT_GETTERS:
573 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
574 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
577 ref_value = self._NETREF_COMPONENT_GETTERS[component](
578 ref_testbed, ref_guid, component_index, attribute)
580 return value.replace(match.group(), ref_value)
581 # couldn't find value
584 def do_netrefs(self, data, fail_if_undefined = False):
586 for (testbed_guid, guid), attrs in self._netrefs.items():
587 testbed = self._testbeds[testbed_guid]
590 value = testbed.get(guid, name)
591 if isinstance(value, basestring):
592 ref_value = self.resolve_netref_value(value)
593 if ref_value is not None:
594 testbed.set(guid, name, ref_value)
595 elif fail_if_undefined:
596 raise ValueError, "Unresolvable netref in: %r" % (value,)
600 del self._netrefs[(testbed_guid, guid)]
603 for testbed_guid, attrs in self._testbed_netrefs.items():
604 tb_data = dict(data.get_attribute_data(testbed_guid))
608 value = tb_data.get(name)
609 if isinstance(value, basestring):
610 ref_value = self.resolve_netref_value(value)
611 if ref_value is not None:
612 data.set_attribute_data(testbed_guid, name, ref_value)
613 elif fail_if_undefined:
614 raise ValueError, "Unresolvable netref in: %r" % (value,)
618 del self._testbed_netrefs[testbed_guid]
621 def _init_testbed_controllers(self, data, recover = False):
622 blacklist_testbeds = set(self._testbeds)
623 element_guids = list()
625 data_guids = data.guids
627 # create testbed controllers
628 for guid in data_guids:
629 if data.is_testbed_data(guid):
630 if guid not in self._testbeds:
631 self._create_testbed_controller(guid, data, element_guids,
634 (testbed_guid, factory_id) = data.get_box_data(guid)
635 if testbed_guid not in blacklist_testbeds:
636 element_guids.append(guid)
637 label = data.get_attribute_data(guid, "label")
638 if label is not None:
639 if label in label_guids:
640 raise RuntimeError, "Label %r is not unique" % (label,)
641 label_guids[label] = guid
643 # replace references to elements labels for its guid
644 self._resolve_labels(data, data_guids, label_guids)
646 # program testbed controllers
648 self._program_testbed_controllers(element_guids, data)
650 def _resolve_labels(self, data, data_guids, label_guids):
651 netrefs = self._netrefs
652 testbed_netrefs = self._testbed_netrefs
653 for guid in data_guids:
654 for name, value in data.get_attribute_data(guid):
655 if isinstance(value, basestring):
656 match = ATTRIBUTE_PATTERN_BASE.search(value)
658 label = match.group("label")
659 if not label.startswith('GUID-'):
660 ref_guid = label_guids.get(label)
661 if ref_guid is not None:
662 value = ATTRIBUTE_PATTERN_BASE.sub(
663 ATTRIBUTE_PATTERN_GUID_SUB % dict(
664 guid = 'GUID-%d' % (ref_guid,),
665 expr = match.group("expr"),
668 data.set_attribute_data(guid, name, value)
670 # memorize which guid-attribute pairs require
671 # postprocessing, to avoid excessive controller-testbed
672 # communication at configuration time
673 # (which could require high-latency network I/O)
674 if not data.is_testbed_data(guid):
675 (testbed_guid, factory_id) = data.get_box_data(guid)
676 netrefs[(testbed_guid, guid)].add(name)
678 testbed_netrefs[guid].add(name)
680 def _create_testbed_controller(self, guid, data, element_guids, recover):
681 (testbed_id, testbed_version) = data.get_testbed_data(guid)
682 deployment_config = self._deployment_config.get(guid)
684 if deployment_config is None:
686 deployment_config = proxy.AccessConfiguration()
688 for (name, value) in data.get_attribute_data(guid):
689 if value is not None and deployment_config.has_attribute(name):
690 # if any deployment config attribute has a netref, we can't
691 # create this controller yet
692 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
693 # remember to re-issue this one
694 self._netreffed_testbeds.add(guid)
697 # copy deployment config attribute
698 deployment_config.set_attribute_value(name, value)
701 self._deployment_config[guid] = deployment_config
703 if deployment_config is not None:
704 # force recovery mode
705 deployment_config.set_attribute_value("recover",recover)
707 testbed = proxy.create_testbed_controller(testbed_id,
708 testbed_version, deployment_config)
709 for (name, value) in data.get_attribute_data(guid):
710 testbed.defer_configure(name, value)
711 self._testbeds[guid] = testbed
712 if guid in self._netreffed_testbeds:
713 self._netreffed_testbeds.remove(guid)
715 def _program_testbed_controllers(self, element_guids, data):
716 for guid in element_guids:
717 (testbed_guid, factory_id) = data.get_box_data(guid)
718 testbed = self._testbeds.get(testbed_guid)
720 testbed.defer_create(guid, factory_id)
721 for (name, value) in data.get_attribute_data(guid):
722 testbed.defer_create_set(guid, name, value)
724 for guid in element_guids:
725 (testbed_guid, factory_id) = data.get_box_data(guid)
726 testbed = self._testbeds.get(testbed_guid)
728 for (connector_type_name, cross_guid, cross_connector_type_name) \
729 in data.get_connection_data(guid):
730 (testbed_guid, factory_id) = data.get_box_data(guid)
731 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
733 if testbed_guid == cross_testbed_guid:
734 testbed.defer_connect(guid, connector_type_name,
735 cross_guid, cross_connector_type_name)
736 for trace_id in data.get_trace_data(guid):
737 testbed.defer_add_trace(guid, trace_id)
738 for (autoconf, address, netprefix, broadcast) in \
739 data.get_address_data(guid):
741 testbed.defer_add_address(guid, address, netprefix,
743 for (destination, netprefix, nexthop) in data.get_route_data(guid):
744 testbed.defer_add_route(guid, destination, netprefix, nexthop)
746 def _program_testbed_cross_connections(self, data):
747 data_guids = data.guids
749 for guid in data_guids:
750 if not data.is_testbed_data(guid):
751 (testbed_guid, factory_id) = data.get_box_data(guid)
752 testbed = self._testbeds.get(testbed_guid)
754 for (connector_type_name, cross_guid, cross_connector_type_name) \
755 in data.get_connection_data(guid):
756 (testbed_guid, factory_id) = data.get_box_data(guid)
757 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
759 if testbed_guid != cross_testbed_guid:
760 cross_testbed = self._testbeds[cross_testbed_guid]
761 cross_testbed_id = cross_testbed.testbed_id
762 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
763 cross_testbed_guid, cross_testbed_id, cross_factory_id,
764 cross_connector_type_name)
765 # save cross data for later
766 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
769 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
770 if testbed_guid not in self._cross_data:
771 self._cross_data[testbed_guid] = dict()
772 if cross_testbed_guid not in self._cross_data[testbed_guid]:
773 self._cross_data[testbed_guid][cross_testbed_guid] = set()
774 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
776 def _get_cross_data(self, testbed_guid):
778 if not testbed_guid in self._cross_data:
780 for cross_testbed_guid, guid_list in \
781 self._cross_data[testbed_guid].iteritems():
782 cross_data[cross_testbed_guid] = dict()
783 cross_testbed = self._testbeds[cross_testbed_guid]
784 for cross_guid in guid_list:
785 elem_cross_data = dict(
787 _testbed_guid = cross_testbed_guid,
788 _testbed_id = cross_testbed.testbed_id,
789 _testbed_version = cross_testbed.testbed_version)
790 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
791 attributes_list = cross_testbed.get_attribute_list(cross_guid)
792 for attr_name in attributes_list:
793 attr_value = cross_testbed.get(cross_guid, attr_name)
794 elem_cross_data[attr_name] = attr_value