2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.core.connector import ConnectorTypeBase
6 from nepi.util import proxy, validation
7 from nepi.util.constants import STATUS_FINISHED, TIME_NOW
8 from nepi.util.parser._xml import XmlExperimentParser
15 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
16 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
17 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
19 class ConnectorType(ConnectorTypeBase):
20 def __init__(self, testbed_id, factory_id, name, max = -1, min = 0):
21 super(ConnectorType, self).__init__(testbed_id, factory_id, name, max, min)
22 # from_connections -- connections where the other connector is the "From"
23 # to_connections -- connections where the other connector is the "To"
24 # keys in the dictionary correspond to the
25 # connector_type_id for possible connections. The value is a tuple:
26 # (can_cross, connect)
27 # can_cross: indicates if the connection is allowed accros different
29 # code: is the connection function to be invoked when the elements
31 self._from_connections = dict()
32 self._to_connections = dict()
34 def add_from_connection(self, testbed_id, factory_id, name, can_cross,
35 init_code, compl_code):
36 type_id = self.make_connector_type_id(testbed_id, factory_id, name)
37 self._from_connections[type_id] = (can_cross, init_code, compl_code)
39 def add_to_connection(self, testbed_id, factory_id, name, can_cross,
40 init_code, compl_code):
41 type_id = self.make_connector_type_id(testbed_id, factory_id, name)
42 self._to_connections[type_id] = (can_cross, init_code, compl_code)
44 def can_connect(self, testbed_id, factory_id, name, count,
46 connector_type_id = self.make_connector_type_id(testbed_id, factory_id, name)
47 for lookup_type_id in self._type_resolution_order(connector_type_id):
48 if lookup_type_id in self._from_connections:
49 (can_cross, init_code, compl_code) = self._from_connections[lookup_type_id]
50 elif lookup_type_id in self._to_connections:
51 (can_cross, init_code, compl_code) = self._to_connections[lookup_type_id]
55 return not must_cross or can_cross
59 def _connect_to_code(self, testbed_id, factory_id, name):
60 connector_type_id = self.make_connector_type_id(testbed_id, factory_id, name)
61 for lookup_type_id in self._type_resolution_order(connector_type_id):
62 if lookup_type_id in self._to_connections:
63 (can_cross, init_code, compl_code) = self._to_connections[lookup_type_id]
64 return (init_code, compl_code)
68 def connect_to_init_code(self, testbed_id, factory_id, name):
69 return self._connect_to_code(testbed_id, factory_id, name)[0]
71 def connect_to_compl_code(self, testbed_id, factory_id, name):
72 return self._connect_to_code(testbed_id, factory_id, name)[1]
74 class Factory(AttributesMap):
75 def __init__(self, factory_id, create_function, start_function,
76 stop_function, status_function,
77 configure_function, preconfigure_function,
78 allow_addresses = False, has_addresses = False,
79 allow_routes = False, has_routes = False):
80 super(Factory, self).__init__()
81 self._factory_id = factory_id
82 self._allow_addresses = bool(allow_addresses)
83 self._allow_routes = bool(allow_routes)
84 self._has_addresses = bool(has_addresses) or self._allow_addresses
85 self._has_routes = bool(has_routes) or self._allow_routes
86 self._create_function = create_function
87 self._start_function = start_function
88 self._stop_function = stop_function
89 self._status_function = status_function
90 self._configure_function = configure_function
91 self._preconfigure_function = preconfigure_function
92 self._connector_types = dict()
94 self._box_attributes = AttributesMap()
98 return self._factory_id
101 def allow_addresses(self):
102 return self._allow_addresses
105 def allow_routes(self):
106 return self._allow_routes
109 def has_addresses(self):
110 return self._has_addresses
113 def has_routes(self):
114 return self._has_routes
117 def box_attributes(self):
118 return self._box_attributes
121 def create_function(self):
122 return self._create_function
125 def start_function(self):
126 return self._start_function
129 def stop_function(self):
130 return self._stop_function
133 def status_function(self):
134 return self._status_function
137 def configure_function(self):
138 return self._configure_function
141 def preconfigure_function(self):
142 return self._preconfigure_function
148 def connector_type(self, name):
149 return self._connector_types[name]
151 def add_connector_type(self, connector_type):
152 self._connector_types[connector_type.name] = connector_type
154 def add_trace(self, trace_id):
155 self._traces.append(trace_id)
157 def add_box_attribute(self, name, help, type, value = None, range = None,
158 allowed = None, flags = Attribute.NoFlags, validation_function = None):
159 self._box_attributes.add_attribute(name, help, type, value, range,
160 allowed, flags, validation_function)
162 class TestbedController(object):
163 def __init__(self, testbed_id, testbed_version):
164 self._testbed_id = testbed_id
165 self._testbed_version = testbed_version
168 def testbed_id(self):
169 return self._testbed_id
172 def testbed_version(self):
173 return self._testbed_version
177 raise NotImplementedError
179 def defer_configure(self, name, value):
180 """Instructs setting a configuartion attribute for the testbed instance"""
181 raise NotImplementedError
183 def defer_create(self, guid, factory_id):
184 """Instructs creation of element """
185 raise NotImplementedError
187 def defer_create_set(self, guid, name, value):
188 """Instructs setting an initial attribute on an element"""
189 raise NotImplementedError
191 def defer_factory_set(self, guid, name, value):
192 """Instructs setting an attribute on a factory"""
193 raise NotImplementedError
195 def defer_connect(self, guid1, connector_type_name1, guid2,
196 connector_type_name2):
197 """Instructs creation of a connection between the given connectors"""
198 raise NotImplementedError
200 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
201 cross_testbed_id, cross_factory_id, cross_connector_type_name):
203 Instructs creation of a connection between the given connectors
204 of different testbed instances
206 raise NotImplementedError
208 def defer_add_trace(self, guid, trace_id):
209 """Instructs the addition of a trace"""
210 raise NotImplementedError
212 def defer_add_address(self, guid, address, netprefix, broadcast):
213 """Instructs the addition of an address"""
214 raise NotImplementedError
216 def defer_add_route(self, guid, destination, netprefix, nexthop):
217 """Instructs the addition of a route"""
218 raise NotImplementedError
221 """After do_setup the testbed initial configuration is done"""
222 raise NotImplementedError
226 After do_create all instructed elements are created and
229 raise NotImplementedError
231 def do_connect_init(self):
233 After do_connect_init all internal connections between testbed elements
236 raise NotImplementedError
238 def do_connect_compl(self):
240 After do_connect all internal connections between testbed elements
243 raise NotImplementedError
245 def do_configure(self):
246 """After do_configure elements are configured"""
247 raise NotImplementedError
249 def do_cross_connect_init(self, cross_data):
251 After do_cross_connect_init initiation of all external connections
252 between different testbed elements is performed
254 raise NotImplementedError
256 def do_cross_connect_compl(self, cross_data):
258 After do_cross_connect_compl completion of all external connections
259 between different testbed elements is performed
261 raise NotImplementedError
264 raise NotImplementedError
267 raise NotImplementedError
269 def set(self, time, guid, name, value):
270 raise NotImplementedError
272 def get(self, time, guid, name):
273 raise NotImplementedError
275 def get_route(self, guid, index, attribute):
279 guid: guid of box to query
280 index: number of routing entry to fetch
281 attribute: one of Destination, NextHop, NetPrefix
283 raise NotImplementedError
285 def get_address(self, guid, index, attribute='Address'):
289 guid: guid of box to query
290 index: number of inteface to select
291 attribute: one of Address, NetPrefix, Broadcast
293 raise NotImplementedError
295 def get_attribute_list(self, guid):
296 raise NotImplementedError
298 def action(self, time, guid, action):
299 raise NotImplementedError
301 def status(self, guid):
302 raise NotImplementedError
304 def trace(self, guid, trace_id, attribute='value'):
305 raise NotImplementedError
308 raise NotImplementedError
310 class ExperimentController(object):
311 def __init__(self, experiment_xml, root_dir):
312 self._experiment_xml = experiment_xml
313 self._testbeds = dict()
314 self._access_config = dict()
315 self._netrefs = dict()
316 self._cross_data = dict()
317 self._root_dir = root_dir
319 self.persist_experiment_xml()
322 def experiment_xml(self):
323 return self._experiment_xml
325 def persist_experiment_xml(self):
326 xml_path = os.path.join(self._root_dir, "experiment.xml")
327 f = open(xml_path, "w")
328 f.write(self._experiment_xml)
331 def set_access_configuration(self, testbed_guid, access_config):
332 self._access_config[testbed_guid] = access_config
334 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
335 return self._testbeds[testbed_guid].trace(guid, trace_id, attribute)
338 def _parallel(callables):
339 threads = [ threading.Thread(target=callable) for callable in callables ]
340 for thread in threads:
342 for thread in threads:
346 self._init_testbed_controllers()
348 # persist testbed connection data, for potential recovery
349 self._persist_testbed_proxies()
351 # perform setup in parallel for all test beds,
352 # wait for all threads to finish
353 self._parallel([testbed.do_setup
354 for testbed in self._testbeds.itervalues()])
356 # perform create-connect in parallel, wait
357 # (internal connections only)
358 self._parallel([lambda : testbed.do_create()
359 for testbed in self._testbeds.itervalues()])
362 # ONLY THE LAST TESTBED HAS ELEMENTS CREATED!!!
363 #for testbed in self._testbeds.itervalues():
364 # print testbed._testbed_id
365 # print testbed._elements
367 self._parallel([lambda : testbed.do_connect_init()
368 for testbed in self._testbeds.itervalues()])
370 self._parallel([lambda : testbed.do_connect_compl()
371 for testbed in self._testbeds.itervalues()])
373 self._parallel([lambda : testbed.do_preconfigure()
374 for testbed in self._testbeds.itervalues()])
377 self.do_netrefs(fail_if_undefined=True)
379 # perform do_configure in parallel for al testbeds
380 # (it's internal configuration for each)
381 self._parallel([testbed.do_configure
382 for testbed in self._testbeds.itervalues()])
384 # cross-connect (cannot be done in parallel)
385 for guid, testbed in self._testbeds.iteritems():
386 cross_data = self._get_cross_data(guid)
387 testbed.do_cross_connect_init(cross_data)
388 for guid, testbed in self._testbeds.iteritems():
389 cross_data = self._get_cross_data(guid)
390 testbed.do_cross_connect_compl(cross_data)
392 # start experiment (parallel start on all testbeds)
393 self._parallel([testbed.start
394 for testbed in self._testbeds.itervalues()])
396 def _persist_testbed_proxies(self):
397 TRANSIENT = ('Recover',)
399 # persist access configuration for all testbeds, so that
400 # recovery mode can reconnect to them if it becomes necessary
401 conf = ConfigParser.RawConfigParser()
402 for testbed_guid, testbed_config in self._access_config.iteritems():
403 testbed_guid = str(testbed_guid)
404 conf.add_section(testbed_guid)
405 for attr in testbed_config.attributes_list:
406 if attr not in TRANSIENT:
407 conf.set(testbed_guid, attr,
408 testbed_config.get_attribute_value(attr))
410 f = open(os.path.join(self._root_dir, 'access_config.ini'), 'w')
414 def _load_testbed_proxies(self):
419 BOOLEAN : 'getboolean',
422 conf = ConfigParser.RawConfigParser()
423 conf.read(os.path.join(self._root_dir, 'access_config.ini'))
424 for testbed_guid in conf.sections():
425 testbed_config = proxy.AccessConfiguration()
426 for attr in conf.options(testbed_guid):
427 testbed_config.set_attribute_value(attr,
428 conf.get(testbed_guid, attr) )
430 testbed_guid = str(testbed_guid)
431 conf.add_section(testbed_guid)
432 for attr in testbed_config.attributes_list:
433 if attr not in TRANSIENT:
434 getter = getattr(conf, TYPEMAP.get(
435 testbed_config.get_attribute_type(attr),
437 testbed_config.set_attribute_value(
438 testbed_guid, attr, getter(attr))
440 def _unpersist_testbed_proxies(self):
442 os.remove(os.path.join(self._root_dir, 'access_config.ini'))
444 # Just print exceptions, this is just cleanup
446 traceback.print_exc(file=sys.stderr)
449 for testbed in self._testbeds.values():
451 self._unpersist_testbed_proxies()
454 # reload perviously persisted testbed access configurations
455 self._load_testbed_proxies()
457 # recreate testbed proxies by reconnecting only
458 self._init_testbed_controllers(recover = True)
460 def is_finished(self, guid):
461 for testbed in self._testbeds.values():
462 for guid_ in testbed.guids:
464 return testbed.status(guid) == STATUS_FINISHED
465 raise RuntimeError("No element exists with guid %d" % guid)
468 for testbed in self._testbeds.values():
472 def _netref_component_split(component):
473 match = COMPONENT_PATTERN.match(component)
475 return match.group("kind"), match.group("index")
477 return component, None
479 def do_netrefs(self, fail_if_undefined = False):
480 COMPONENT_GETTERS = {
482 lambda testbed, guid, index, name :
483 testbed.get_address(guid, index, name),
485 lambda testbed, guid, index, name :
486 testbed.get_route(guid, index, name),
488 lambda testbed, guid, index, name :
489 testbed.trace(guid, index, name),
491 lambda testbed, guid, index, name :
492 testbed.get(TIME_NOW, guid, name),
495 for (testbed_guid, guid), attrs in self._netrefs.iteritems():
496 testbed = self._testbeds[testbed_guid]
498 value = testbed.get(TIME_NOW, guid, name)
499 if isinstance(value, basestring):
500 match = ATTRIBUTE_PATTERN_BASE.search(value)
502 label = match.group("label")
503 if label.startswith('GUID-'):
504 ref_guid = int(label[5:])
506 expr = match.group("expr")
507 component = match.group("component")[1:] # skip the dot
508 attribute = match.group("attribute")
510 # split compound components into component kind and index
511 # eg: 'addr[0]' -> ('addr', '0')
512 component, component_index = self._netref_component_split(component)
514 # find object and resolve expression
515 for ref_testbed in self._testbeds.itervalues():
516 if component not in COMPONENT_GETTERS:
517 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
519 ref_value = COMPONENT_GETTERS[component](
520 ref_testbed, ref_guid, component_index, attribute)
522 testbed.set(TIME_NOW, guid, name,
523 value.replace(match.group(), ref_value))
526 # couldn't find value
527 if fail_if_undefined:
528 raise ValueError, "Unresolvable GUID: %r, in netref: %r" % (ref_guid, expr)
530 def _init_testbed_controllers(self, recover = False):
531 parser = XmlExperimentParser()
532 data = parser.from_xml_to_data(self._experiment_xml)
533 element_guids = list()
535 data_guids = data.guids
537 # create testbed controllers
538 for guid in data_guids:
539 if data.is_testbed_data(guid):
540 self._create_testbed_controller(guid, data, element_guids,
543 element_guids.append(guid)
544 label = data.get_attribute_data(guid, "label")
545 if label is not None:
546 if label in label_guids:
547 raise RuntimeError, "Label %r is not unique" % (label,)
548 label_guids[label] = guid
550 # replace references to elements labels for its guid
551 self._resolve_labels(data, data_guids, label_guids)
553 # program testbed controllers
555 self._program_testbed_controllers(element_guids, data)
557 def _resolve_labels(self, data, data_guids, label_guids):
558 netrefs = self._netrefs
559 for guid in data_guids:
560 if not data.is_testbed_data(guid):
561 for name, value in data.get_attribute_data(guid):
562 if isinstance(value, basestring):
563 match = ATTRIBUTE_PATTERN_BASE.search(value)
565 label = match.group("label")
566 if not label.startswith('GUID-'):
567 ref_guid = label_guids.get(label)
568 if ref_guid is not None:
569 value = ATTRIBUTE_PATTERN_BASE.sub(
570 ATTRIBUTE_PATTERN_GUID_SUB % dict(
571 guid = 'GUID-%d' % (ref_guid,),
572 expr = match.group("expr"),
575 data.set_attribute_data(guid, name, value)
577 # memorize which guid-attribute pairs require
578 # postprocessing, to avoid excessive controller-testbed
579 # communication at configuration time
580 # (which could require high-latency network I/O)
581 (testbed_guid, factory_id) = data.get_box_data(guid)
582 netrefs.setdefault((testbed_guid, guid), set()).add(name)
584 def _create_testbed_controller(self, guid, data, element_guids, recover):
585 (testbed_id, testbed_version) = data.get_testbed_data(guid)
586 access_config = None if guid not in self._access_config else\
587 self._access_config[guid]
589 if recover and access_config is None:
591 access_config = self._access_config[guid] = proxy.AccessConfiguration()
592 if access_config is not None:
593 # force recovery mode
594 access_config.set_attribute_value("recover",recover)
596 testbed = proxy.create_testbed_controller(testbed_id,
597 testbed_version, access_config)
598 for (name, value) in data.get_attribute_data(guid):
599 testbed.defer_configure(name, value)
600 self._testbeds[guid] = testbed
602 def _program_testbed_controllers(self, element_guids, data):
603 for guid in element_guids:
604 (testbed_guid, factory_id) = data.get_box_data(guid)
605 testbed = self._testbeds[testbed_guid]
606 testbed.defer_create(guid, factory_id)
607 for (name, value) in data.get_attribute_data(guid):
608 testbed.defer_create_set(guid, name, value)
610 for guid in element_guids:
611 (testbed_guid, factory_id) = data.get_box_data(guid)
612 testbed = self._testbeds[testbed_guid]
613 for (connector_type_name, cross_guid, cross_connector_type_name) \
614 in data.get_connection_data(guid):
615 (testbed_guid, factory_id) = data.get_box_data(guid)
616 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
618 if testbed_guid == cross_testbed_guid:
619 testbed.defer_connect(guid, connector_type_name,
620 cross_guid, cross_connector_type_name)
622 cross_testbed = self._testbeds[cross_testbed_guid]
623 cross_testbed_id = cross_testbed.testbed_id
624 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
625 cross_testbed_id, cross_factory_id,
626 cross_connector_type_name)
627 # save cross data for later
628 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
630 for trace_id in data.get_trace_data(guid):
631 testbed.defer_add_trace(guid, trace_id)
632 for (autoconf, address, netprefix, broadcast) in \
633 data.get_address_data(guid):
635 testbed.defer_add_address(guid, address, netprefix,
637 for (destination, netprefix, nexthop) in data.get_route_data(guid):
638 testbed.defer_add_route(guid, destination, netprefix, nexthop)
640 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
641 if testbed_guid not in self._cross_data:
642 self._cross_data[testbed_guid] = dict()
643 if cross_testbed_guid not in self._cross_data[testbed_guid]:
644 self._cross_data[testbed_guid][cross_testbed_guid] = list()
645 if cross_testbed_guid not in self._cross_data:
646 self._cross_data[cross_testbed_guid] = dict()
647 if testbed_guid not in self._cross_data[cross_testbed_guid]:
648 self._cross_data[cross_testbed_guid][testbed_guid] = list()
649 self._cross_data[testbed_guid][cross_testbed_guid].append(cross_guid)
650 self._cross_data[cross_testbed_guid][testbed_guid].append(guid)
652 def _get_cross_data(self, testbed_guid):
654 if not testbed_guid in self._cross_data:
656 for cross_testbed_guid, guid_list in \
657 self._cross_data[testbed_guid].iteritems():
658 cross_data[cross_testbed_guid] = dict()
659 cross_testbed = self._testbeds[cross_testbed_guid]
660 for cross_guid in guid_list:
661 elem_cross_data = dict()
662 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
663 attributes_list = cross_testbed.get_attribute_list(cross_guid)
664 for attr_name in attributes_list:
665 attr_value = cross_testbed.get(TIME_NOW, cross_guid,
667 elem_cross_data[attr_name] = attr_value
668 return elem_cross_data