2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.core.connector import ConnectorTypeBase
6 from nepi.util import proxy, validation
7 from nepi.util.constants import STATUS_FINISHED, TIME_NOW
8 from nepi.util.parser._xml import XmlExperimentParser
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
20 class ConnectorType(ConnectorTypeBase):
21 def __init__(self, testbed_id, factory_id, name, max = -1, min = 0):
22 super(ConnectorType, self).__init__(testbed_id, factory_id, name, max, min)
23 # from_connections -- connections where the other connector is the "From"
24 # to_connections -- connections where the other connector is the "To"
25 # keys in the dictionary correspond to the
26 # connector_type_id for possible connections. The value is a tuple:
27 # (can_cross, connect)
28 # can_cross: indicates if the connection is allowed accros different
30 # code: is the connection function to be invoked when the elements
32 self._from_connections = dict()
33 self._to_connections = dict()
35 def add_from_connection(self, testbed_id, factory_id, name, can_cross,
36 init_code, compl_code):
37 type_id = self.make_connector_type_id(testbed_id, factory_id, name)
38 self._from_connections[type_id] = (can_cross, init_code, compl_code)
40 def add_to_connection(self, testbed_id, factory_id, name, can_cross,
41 init_code, compl_code):
42 type_id = self.make_connector_type_id(testbed_id, factory_id, name)
43 self._to_connections[type_id] = (can_cross, init_code, compl_code)
45 def can_connect(self, testbed_id, factory_id, name, count,
47 connector_type_id = self.make_connector_type_id(testbed_id, factory_id, name)
48 for lookup_type_id in self._type_resolution_order(connector_type_id):
49 if lookup_type_id in self._from_connections:
50 (can_cross, init_code, compl_code) = self._from_connections[lookup_type_id]
51 elif lookup_type_id in self._to_connections:
52 (can_cross, init_code, compl_code) = self._to_connections[lookup_type_id]
56 return not must_cross or can_cross
60 def _connect_to_code(self, testbed_id, factory_id, name):
61 connector_type_id = self.make_connector_type_id(testbed_id, factory_id, name)
62 for lookup_type_id in self._type_resolution_order(connector_type_id):
63 if lookup_type_id in self._to_connections:
64 (can_cross, init_code, compl_code) = self._to_connections[lookup_type_id]
65 return (init_code, compl_code)
69 def connect_to_init_code(self, testbed_id, factory_id, name):
70 return self._connect_to_code(testbed_id, factory_id, name)[0]
72 def connect_to_compl_code(self, testbed_id, factory_id, name):
73 return self._connect_to_code(testbed_id, factory_id, name)[1]
75 class Factory(AttributesMap):
76 def __init__(self, factory_id, create_function, start_function,
77 stop_function, status_function,
78 configure_function, preconfigure_function,
79 allow_addresses = False, has_addresses = False,
80 allow_routes = False, has_routes = False):
81 super(Factory, self).__init__()
82 self._factory_id = factory_id
83 self._allow_addresses = bool(allow_addresses)
84 self._allow_routes = bool(allow_routes)
85 self._has_addresses = bool(has_addresses) or self._allow_addresses
86 self._has_routes = bool(has_routes) or self._allow_routes
87 self._create_function = create_function
88 self._start_function = start_function
89 self._stop_function = stop_function
90 self._status_function = status_function
91 self._configure_function = configure_function
92 self._preconfigure_function = preconfigure_function
93 self._connector_types = dict()
95 self._box_attributes = AttributesMap()
99 return self._factory_id
102 def allow_addresses(self):
103 return self._allow_addresses
106 def allow_routes(self):
107 return self._allow_routes
110 def has_addresses(self):
111 return self._has_addresses
114 def has_routes(self):
115 return self._has_routes
118 def box_attributes(self):
119 return self._box_attributes
122 def create_function(self):
123 return self._create_function
126 def start_function(self):
127 return self._start_function
130 def stop_function(self):
131 return self._stop_function
134 def status_function(self):
135 return self._status_function
138 def configure_function(self):
139 return self._configure_function
142 def preconfigure_function(self):
143 return self._preconfigure_function
149 def connector_type(self, name):
150 return self._connector_types[name]
152 def add_connector_type(self, connector_type):
153 self._connector_types[connector_type.name] = connector_type
155 def add_trace(self, trace_id):
156 self._traces.append(trace_id)
158 def add_box_attribute(self, name, help, type, value = None, range = None,
159 allowed = None, flags = Attribute.NoFlags, validation_function = None):
160 self._box_attributes.add_attribute(name, help, type, value, range,
161 allowed, flags, validation_function)
163 class TestbedController(object):
164 def __init__(self, testbed_id, testbed_version):
165 self._testbed_id = testbed_id
166 self._testbed_version = testbed_version
169 def testbed_id(self):
170 return self._testbed_id
173 def testbed_version(self):
174 return self._testbed_version
178 raise NotImplementedError
180 def defer_configure(self, name, value):
181 """Instructs setting a configuartion attribute for the testbed instance"""
182 raise NotImplementedError
184 def defer_create(self, guid, factory_id):
185 """Instructs creation of element """
186 raise NotImplementedError
188 def defer_create_set(self, guid, name, value):
189 """Instructs setting an initial attribute on an element"""
190 raise NotImplementedError
192 def defer_factory_set(self, guid, name, value):
193 """Instructs setting an attribute on a factory"""
194 raise NotImplementedError
196 def defer_connect(self, guid1, connector_type_name1, guid2,
197 connector_type_name2):
198 """Instructs creation of a connection between the given connectors"""
199 raise NotImplementedError
201 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
202 cross_testbed_id, cross_factory_id, cross_connector_type_name):
204 Instructs creation of a connection between the given connectors
205 of different testbed instances
207 raise NotImplementedError
209 def defer_add_trace(self, guid, trace_id):
210 """Instructs the addition of a trace"""
211 raise NotImplementedError
213 def defer_add_address(self, guid, address, netprefix, broadcast):
214 """Instructs the addition of an address"""
215 raise NotImplementedError
217 def defer_add_route(self, guid, destination, netprefix, nexthop):
218 """Instructs the addition of a route"""
219 raise NotImplementedError
222 """After do_setup the testbed initial configuration is done"""
223 raise NotImplementedError
227 After do_create all instructed elements are created and
230 raise NotImplementedError
232 def do_connect_init(self):
234 After do_connect_init all internal connections between testbed elements
237 raise NotImplementedError
239 def do_connect_compl(self):
241 After do_connect all internal connections between testbed elements
244 raise NotImplementedError
246 def do_configure(self):
247 """After do_configure elements are configured"""
248 raise NotImplementedError
250 def do_cross_connect_init(self, cross_data):
252 After do_cross_connect_init initiation of all external connections
253 between different testbed elements is performed
255 raise NotImplementedError
257 def do_cross_connect_compl(self, cross_data):
259 After do_cross_connect_compl completion of all external connections
260 between different testbed elements is performed
262 raise NotImplementedError
265 raise NotImplementedError
268 raise NotImplementedError
270 def set(self, guid, name, value, time = TIME_NOW):
271 raise NotImplementedError
273 def get(self, guid, name, time = TIME_NOW):
274 raise NotImplementedError
276 def get_route(self, guid, index, attribute):
280 guid: guid of box to query
281 index: number of routing entry to fetch
282 attribute: one of Destination, NextHop, NetPrefix
284 raise NotImplementedError
286 def get_address(self, guid, index, attribute='Address'):
290 guid: guid of box to query
291 index: number of inteface to select
292 attribute: one of Address, NetPrefix, Broadcast
294 raise NotImplementedError
296 def get_attribute_list(self, guid):
297 raise NotImplementedError
299 def action(self, time, guid, action):
300 raise NotImplementedError
302 def status(self, guid):
303 raise NotImplementedError
305 def trace(self, guid, trace_id, attribute='value'):
306 raise NotImplementedError
309 raise NotImplementedError
311 class ExperimentController(object):
312 def __init__(self, experiment_xml, root_dir):
313 self._experiment_xml = experiment_xml
314 self._testbeds = dict()
315 self._deployment_config = dict()
316 self._netrefs = collections.defaultdict(set)
317 self._testbed_netrefs = collections.defaultdict(set)
318 self._cross_data = dict()
319 self._root_dir = root_dir
320 self._netreffed_testbeds = set()
322 self.persist_experiment_xml()
325 def experiment_xml(self):
326 return self._experiment_xml
328 def persist_experiment_xml(self):
329 xml_path = os.path.join(self._root_dir, "experiment.xml")
330 f = open(xml_path, "w")
331 f.write(self._experiment_xml)
334 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
335 return self._testbeds[testbed_guid].trace(guid, trace_id, attribute)
338 def _parallel(callables):
339 threads = [ threading.Thread(target=callable) for callable in callables ]
340 for thread in threads:
342 for thread in threads:
346 parser = XmlExperimentParser()
347 data = parser.from_xml_to_data(self._experiment_xml)
349 self._init_testbed_controllers(data)
351 # persist testbed connection data, for potential recovery
352 self._persist_testbed_proxies()
354 def steps_to_configure(self, allowed_guids):
355 # perform setup in parallel for all test beds,
356 # wait for all threads to finish
357 self._parallel([testbed.do_setup
358 for guid,testbed in self._testbeds.iteritems()
359 if guid in allowed_guids])
361 # perform create-connect in parallel, wait
362 # (internal connections only)
363 self._parallel([testbed.do_create
364 for guid,testbed in self._testbeds.iteritems()
365 if guid in allowed_guids])
367 self._parallel([testbed.do_connect_init
368 for guid,testbed in self._testbeds.iteritems()
369 if guid in allowed_guids])
371 self._parallel([testbed.do_connect_compl
372 for guid,testbed in self._testbeds.iteritems()
373 if guid in allowed_guids])
375 self._parallel([testbed.do_preconfigure
376 for guid,testbed in self._testbeds.iteritems()
377 if guid in allowed_guids])
379 steps_to_configure(self, self._testbeds)
381 if self._netreffed_testbeds:
382 # initally resolve netrefs
383 self.do_netrefs(data, fail_if_undefined=False)
385 # rinse and repeat, for netreffed testbeds
386 netreffed_testbeds = set(self._netreffed_testbeds)
388 self._init_testbed_controllers(data)
390 # persist testbed connection data, for potential recovery
391 self._persist_testbed_proxies()
393 # configure dependant testbeds
394 steps_to_configure(self, netreffed_testbeds)
396 # final netref step, fail if anything's left unresolved
397 self.do_netrefs(data, fail_if_undefined=True)
399 # perform do_configure in parallel for al testbeds
400 # (it's internal configuration for each)
401 self._parallel([testbed.do_configure
402 for testbed in self._testbeds.itervalues()])
404 # cross-connect (cannot be done in parallel)
405 for guid, testbed in self._testbeds.iteritems():
406 cross_data = self._get_cross_data(guid)
407 testbed.do_cross_connect_init(cross_data)
408 for guid, testbed in self._testbeds.iteritems():
409 cross_data = self._get_cross_data(guid)
410 testbed.do_cross_connect_compl(cross_data)
412 # start experiment (parallel start on all testbeds)
413 self._parallel([testbed.start
414 for testbed in self._testbeds.itervalues()])
416 def _persist_testbed_proxies(self):
417 TRANSIENT = ('Recover',)
419 # persist access configuration for all testbeds, so that
420 # recovery mode can reconnect to them if it becomes necessary
421 conf = ConfigParser.RawConfigParser()
422 for testbed_guid, testbed_config in self._deployment_config.iteritems():
423 testbed_guid = str(testbed_guid)
424 conf.add_section(testbed_guid)
425 for attr in testbed_config.attributes_list:
426 if attr not in TRANSIENT:
427 conf.set(testbed_guid, attr,
428 testbed_config.get_attribute_value(attr))
430 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
434 def _load_testbed_proxies(self):
439 BOOLEAN : 'getboolean',
442 conf = ConfigParser.RawConfigParser()
443 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
444 for testbed_guid in conf.sections():
445 testbed_config = proxy.AccessConfiguration()
446 for attr in conf.options(testbed_guid):
447 testbed_config.set_attribute_value(attr,
448 conf.get(testbed_guid, attr) )
450 testbed_guid = str(testbed_guid)
451 conf.add_section(testbed_guid)
452 for attr in testbed_config.attributes_list:
453 if attr not in TRANSIENT:
454 getter = getattr(conf, TYPEMAP.get(
455 testbed_config.get_attribute_type(attr),
457 testbed_config.set_attribute_value(
458 testbed_guid, attr, getter(attr))
460 def _unpersist_testbed_proxies(self):
462 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
464 # Just print exceptions, this is just cleanup
466 traceback.print_exc(file=sys.stderr)
469 for testbed in self._testbeds.values():
471 self._unpersist_testbed_proxies()
474 # reload perviously persisted testbed access configurations
475 self._load_testbed_proxies()
477 # recreate testbed proxies by reconnecting only
478 self._init_testbed_controllers(recover = True)
480 # another time, for netrefs
481 self._init_testbed_controllers(recover = True)
483 def is_finished(self, guid):
484 for testbed in self._testbeds.values():
485 for guid_ in testbed.guids:
487 return testbed.status(guid) == STATUS_FINISHED
488 raise RuntimeError("No element exists with guid %d" % guid)
490 def set(self, testbed_guid, guid, name, value, time = TIME_NOW):
491 testbed = self._testbeds[testbed_guid]
492 testbed.set(guid, name, value, time)
494 def get(self, testbed_guid, guid, name, time = TIME_NOW):
495 testbed = self._testbeds[testbed_guid]
496 return testbed.get(guid, name, time)
499 for testbed in self._testbeds.values():
503 def _netref_component_split(component):
504 match = COMPONENT_PATTERN.match(component)
506 return match.group("kind"), match.group("index")
508 return component, None
510 _NETREF_COMPONENT_GETTERS = {
512 lambda testbed, guid, index, name:
513 testbed.get_address(guid, index, name),
515 lambda testbed, guid, index, name:
516 testbed.get_route(guid, index, name),
518 lambda testbed, guid, index, name:
519 testbed.trace(guid, index, name),
521 lambda testbed, guid, index, name:
522 testbed.get(guid, name),
525 def resolve_netref_value(self, value):
526 match = ATTRIBUTE_PATTERN_BASE.search(value)
528 label = match.group("label")
529 if label.startswith('GUID-'):
530 ref_guid = int(label[5:])
532 expr = match.group("expr")
533 component = match.group("component")[1:] # skip the dot
534 attribute = match.group("attribute")
536 # split compound components into component kind and index
537 # eg: 'addr[0]' -> ('addr', '0')
538 component, component_index = self._netref_component_split(component)
540 # find object and resolve expression
541 for ref_testbed in self._testbeds.itervalues():
542 if component not in self._NETREF_COMPONENT_GETTERS:
543 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
545 ref_value = self._NETREF_COMPONENT_GETTERS[component](
546 ref_testbed, ref_guid, component_index, attribute)
548 return value.replace(match.group(), ref_value)
549 # couldn't find value
552 def do_netrefs(self, data, fail_if_undefined = False):
554 for (testbed_guid, guid), attrs in self._netrefs.iteritems():
555 testbed = self._testbeds[testbed_guid]
557 value = testbed.get(guid, name)
558 if isinstance(value, basestring):
559 ref_value = self.resolve_netref_value(value)
560 if ref_value is not None:
561 testbed.set(guid, name, ref_value)
562 elif fail_if_undefined:
563 raise ValueError, "Unresolvable netref in: %r" % (value,)
566 for testbed_guid, attrs in self._testbed_netrefs.iteritems():
567 tb_data = dict(data.get_attribute_data(testbed_guid))
570 value = tb_data.get(name)
571 if isinstance(value, basestring):
572 ref_value = self.resolve_netref_value(value)
573 if ref_value is not None:
574 data.set_attribute_data(testbed_guid, name, ref_value)
575 elif fail_if_undefined:
576 raise ValueError, "Unresolvable netref in: %r" % (value,)
578 self._netrefs.clear()
579 self._testbed_netrefs.clear()
581 def _init_testbed_controllers(self, data, recover = False):
582 blacklist_testbeds = set(self._testbeds)
583 element_guids = list()
585 data_guids = data.guids
587 # create testbed controllers
588 for guid in data_guids:
589 if data.is_testbed_data(guid):
590 if guid not in self._testbeds:
591 self._create_testbed_controller(guid, data, element_guids,
594 (testbed_guid, factory_id) = data.get_box_data(guid)
595 if testbed_guid not in blacklist_testbeds:
596 element_guids.append(guid)
597 label = data.get_attribute_data(guid, "label")
598 if label is not None:
599 if label in label_guids:
600 raise RuntimeError, "Label %r is not unique" % (label,)
601 label_guids[label] = guid
603 # replace references to elements labels for its guid
604 self._resolve_labels(data, data_guids, label_guids)
606 # program testbed controllers
608 self._program_testbed_controllers(element_guids, data)
610 def _resolve_labels(self, data, data_guids, label_guids):
611 netrefs = self._netrefs
612 testbed_netrefs = self._testbed_netrefs
613 for guid in data_guids:
614 for name, value in data.get_attribute_data(guid):
615 if isinstance(value, basestring):
616 match = ATTRIBUTE_PATTERN_BASE.search(value)
618 label = match.group("label")
619 if not label.startswith('GUID-'):
620 ref_guid = label_guids.get(label)
621 if ref_guid is not None:
622 value = ATTRIBUTE_PATTERN_BASE.sub(
623 ATTRIBUTE_PATTERN_GUID_SUB % dict(
624 guid = 'GUID-%d' % (ref_guid,),
625 expr = match.group("expr"),
628 data.set_attribute_data(guid, name, value)
630 # memorize which guid-attribute pairs require
631 # postprocessing, to avoid excessive controller-testbed
632 # communication at configuration time
633 # (which could require high-latency network I/O)
634 if not data.is_testbed_data(guid):
635 (testbed_guid, factory_id) = data.get_box_data(guid)
636 netrefs[(testbed_guid, guid)].add(name)
638 testbed_netrefs[guid].add(name)
640 def _create_testbed_controller(self, guid, data, element_guids, recover):
641 (testbed_id, testbed_version) = data.get_testbed_data(guid)
642 deployment_config = self._deployment_config.get(guid)
644 if deployment_config is None:
646 deployment_config = proxy.AccessConfiguration()
648 for (name, value) in data.get_attribute_data(guid):
649 if value is not None and deployment_config.has_attribute(name):
650 # if any deployment config attribute has a netref, we can't
651 # create this controller yet
652 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
653 # remember to re-issue this one
654 self._netreffed_testbeds.add(guid)
657 # copy deployment config attribute
658 deployment_config.set_attribute_value(name, value)
661 self._deployment_config[guid] = deployment_config
663 if deployment_config is not None:
664 # force recovery mode
665 deployment_config.set_attribute_value("recover",recover)
667 testbed = proxy.create_testbed_controller(testbed_id,
668 testbed_version, deployment_config)
669 for (name, value) in data.get_attribute_data(guid):
670 testbed.defer_configure(name, value)
671 self._testbeds[guid] = testbed
672 if guid in self._netreffed_testbeds:
673 self._netreffed_testbeds.remove(guid)
675 def _program_testbed_controllers(self, element_guids, data):
676 for guid in element_guids:
677 (testbed_guid, factory_id) = data.get_box_data(guid)
678 testbed = self._testbeds[testbed_guid]
679 testbed.defer_create(guid, factory_id)
680 for (name, value) in data.get_attribute_data(guid):
681 testbed.defer_create_set(guid, name, value)
683 for guid in element_guids:
684 (testbed_guid, factory_id) = data.get_box_data(guid)
685 testbed = self._testbeds[testbed_guid]
686 for (connector_type_name, cross_guid, cross_connector_type_name) \
687 in data.get_connection_data(guid):
688 (testbed_guid, factory_id) = data.get_box_data(guid)
689 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
691 if testbed_guid == cross_testbed_guid:
692 testbed.defer_connect(guid, connector_type_name,
693 cross_guid, cross_connector_type_name)
695 cross_testbed = self._testbeds[cross_testbed_guid]
696 cross_testbed_id = cross_testbed.testbed_id
697 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
698 cross_testbed_guid, cross_testbed_id, cross_factory_id,
699 cross_connector_type_name)
700 # save cross data for later
701 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
703 for trace_id in data.get_trace_data(guid):
704 testbed.defer_add_trace(guid, trace_id)
705 for (autoconf, address, netprefix, broadcast) in \
706 data.get_address_data(guid):
708 testbed.defer_add_address(guid, address, netprefix,
710 for (destination, netprefix, nexthop) in data.get_route_data(guid):
711 testbed.defer_add_route(guid, destination, netprefix, nexthop)
713 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
714 if testbed_guid not in self._cross_data:
715 self._cross_data[testbed_guid] = dict()
716 if cross_testbed_guid not in self._cross_data[testbed_guid]:
717 self._cross_data[testbed_guid][cross_testbed_guid] = set()
718 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
720 def _get_cross_data(self, testbed_guid):
722 if not testbed_guid in self._cross_data:
724 for cross_testbed_guid, guid_list in \
725 self._cross_data[testbed_guid].iteritems():
726 cross_data[cross_testbed_guid] = dict()
727 cross_testbed = self._testbeds[cross_testbed_guid]
728 for cross_guid in guid_list:
729 elem_cross_data = dict()
730 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
731 attributes_list = cross_testbed.get_attribute_list(cross_guid)
732 for attr_name in attributes_list:
733 attr_value = cross_testbed.get(cross_guid, attr_name)
734 elem_cross_data[attr_name] = attr_value