2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import proxy, validation
6 from nepi.util.constants import STATUS_FINISHED, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
14 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
15 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
16 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
18 class ConnectorType(object):
19 def __init__(self, testbed_id, factory_id, name, max = -1, min = 0):
20 super(ConnectorType, self).__init__()
25 "The maximum number of connections allowed need to be more than 0")
28 "The minimum number of connections allowed needs to be at least 0")
29 # connector_type_id -- univoquely identifies a connector type
31 self._connector_type_id = (testbed_id.lower(), factory_id.lower(),
33 # name -- display name for the connector type
35 # max -- maximum amount of connections that this type support,
38 # min -- minimum amount of connections required by this type of connector
40 # from_connections -- connections where the other connector is the "From"
41 # to_connections -- connections where the other connector is the "To"
42 # keys in the dictionary correspond to the
43 # connector_type_id for possible connections. The value is a tuple:
44 # (can_cross, connect)
45 # can_cross: indicates if the connection is allowed accros different
47 # code: is the connection function to be invoked when the elements
49 self._from_connections = dict()
50 self._to_connections = dict()
53 def connector_type_id(self):
54 return self._connector_type_id
68 def add_from_connection(self, testbed_id, factory_id, name, can_cross, code):
69 self._from_connections[(testbed_id.lower(), factory_id.lower(),
70 name.lower())] = (can_cross, code)
72 def add_to_connection(self, testbed_id, factory_id, name, can_cross, code):
73 self._to_connections[(testbed_id.lower(), factory_id.lower(),
74 name.lower())] = (can_cross, code)
76 def can_connect(self, testbed_id, factory_id, name, count,
78 connector_type_id = (testbed_id.lower(), factory_id.lower(),
80 if connector_type_id in self._from_connections:
81 (can_cross, code) = self._from_connections[connector_type_id]
82 elif connector_type_id in self._to_connections:
83 (can_cross, code) = self._to_connections[connector_type_id]
86 return not must_cross or can_cross
88 def code_to_connect(self, testbed_id, factory_id, name):
89 connector_type_id = (testbed_id.lower(), factory_id.lower(),
91 if not connector_type_id in self._to_connections.keys():
93 (can_cross, code) = self._to_connections[connector_type_id]
96 # TODO: create_function, start_function, stop_function, status_function
98 class Factory(AttributesMap):
99 def __init__(self, factory_id, create_function, start_function,
100 stop_function, status_function,
101 configure_function, preconfigure_function,
102 allow_addresses = False, allow_routes = False):
103 super(Factory, self).__init__()
104 self._factory_id = factory_id
105 self._allow_addresses = (allow_addresses == True)
106 self._allow_routes = (allow_routes == True)
107 self._create_function = create_function
108 self._start_function = start_function
109 self._stop_function = stop_function
110 self._status_function = status_function
111 self._configure_function = configure_function
112 self._preconfigure_function = preconfigure_function
113 self._connector_types = dict()
114 self._traces = list()
115 self._box_attributes = AttributesMap()
118 def factory_id(self):
119 return self._factory_id
122 def allow_addresses(self):
123 return self._allow_addresses
126 def allow_routes(self):
127 return self._allow_routes
130 def box_attributes(self):
131 return self._box_attributes
134 def create_function(self):
135 return self._create_function
138 def start_function(self):
139 return self._start_function
142 def stop_function(self):
143 return self._stop_function
146 def status_function(self):
147 return self._status_function
150 def configure_function(self):
151 return self._configure_function
154 def preconfigure_function(self):
155 return self._preconfigure_function
161 def connector_type(self, name):
162 return self._connector_types[name]
164 def add_connector_type(self, connector_type):
165 self._connector_types[connector_type.name] = connector_type
167 def add_trace(self, trace_id):
168 self._traces.append(trace_id)
170 def add_box_attribute(self, name, help, type, value = None, range = None,
171 allowed = None, flags = Attribute.NoFlags, validation_function = None):
172 self._box_attributes.add_attribute(name, help, type, value, range,
173 allowed, flags, validation_function)
175 class TestbedController(object):
176 def __init__(self, testbed_id, testbed_version):
177 self._testbed_id = testbed_id
178 self._testbed_version = testbed_version
182 raise NotImplementedError
184 def defer_configure(self, name, value):
185 """Instructs setting a configuartion attribute for the testbed instance"""
186 raise NotImplementedError
188 def defer_create(self, guid, factory_id):
189 """Instructs creation of element """
190 raise NotImplementedError
192 def defer_create_set(self, guid, name, value):
193 """Instructs setting an initial attribute on an element"""
194 raise NotImplementedError
196 def defer_factory_set(self, guid, name, value):
197 """Instructs setting an attribute on a factory"""
198 raise NotImplementedError
200 def defer_connect(self, guid1, connector_type_name1, guid2,
201 connector_type_name2):
202 """Instructs creation of a connection between the given connectors"""
203 raise NotImplementedError
205 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
206 cross_testbed_id, cross_factory_id, cross_connector_type_name):
208 Instructs creation of a connection between the given connectors
209 of different testbed instances
211 raise NotImplementedError
213 def defer_add_trace(self, guid, trace_id):
214 """Instructs the addition of a trace"""
215 raise NotImplementedError
217 def defer_add_address(self, guid, address, netprefix, broadcast):
218 """Instructs the addition of an address"""
219 raise NotImplementedError
221 def defer_add_route(self, guid, destination, netprefix, nexthop):
222 """Instructs the addition of a route"""
223 raise NotImplementedError
226 """After do_setup the testbed initial configuration is done"""
227 raise NotImplementedError
231 After do_create all instructed elements are created and
234 raise NotImplementedError
236 def do_connect(self):
238 After do_connect all internal connections between testbed elements
241 raise NotImplementedError
243 def do_configure(self):
244 """After do_configure elements are configured"""
245 raise NotImplementedError
247 def do_cross_connect(self):
249 After do_cross_connect all external connections between different testbed
252 raise NotImplementedError
255 raise NotImplementedError
258 raise NotImplementedError
260 def set(self, time, guid, name, value):
261 raise NotImplementedError
263 def get(self, time, guid, name):
264 raise NotImplementedError
266 def get_route(self, guid, index, attribute):
270 guid: guid of box to query
271 index: number of routing entry to fetch
272 attribute: one of Destination, NextHop, NetPrefix
274 raise NotImplementedError
276 def get_address(self, guid, index, attribute='Address'):
280 guid: guid of box to query
281 index: number of inteface to select
282 attribute: one of Address, NetPrefix, Broadcast
284 raise NotImplementedError
286 def action(self, time, guid, action):
287 raise NotImplementedError
289 def status(self, guid):
290 raise NotImplementedError
292 def trace(self, guid, trace_id, attribute='value'):
293 raise NotImplementedError
296 raise NotImplementedError
298 class ExperimentController(object):
299 def __init__(self, experiment_xml, root_dir):
300 self._experiment_xml = experiment_xml
301 self._testbeds = dict()
302 self._access_config = dict()
303 self._netrefs = dict()
304 self._crossdata = dict()
305 self._root_dir = root_dir
307 self.persist_experiment_xml()
310 def experiment_xml(self):
311 return self._experiment_xml
313 def persist_experiment_xml(self):
314 xml_path = os.path.join(self._root_dir, "experiment.xml")
315 f = open(xml_path, "w")
316 f.write(self._experiment_xml)
319 def set_access_configuration(self, testbed_guid, access_config):
320 self._access_config[testbed_guid] = access_config
322 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
323 return self._testbeds[testbed_guid].trace(guid, trace_id, attribute)
326 def _parallel(callables):
327 threads = [ threading.Thread(target=callable) for callable in callables ]
328 for thread in threads:
330 for thread in threads:
334 self._init_testbed_controllers()
336 # persist testbed connection data, for potential recovery
337 self._persist_testbed_proxies()
339 # perform setup in parallel for all test beds,
340 # wait for all threads to finish
341 self._parallel([testbed.do_setup
342 for testbed in self._testbeds.itervalues()])
344 # perform create-connect in parallel, wait
345 # (internal connections only)
346 self._parallel([lambda : (testbed.do_create(),
347 testbed.do_connect(),
348 testbed.do_preconfigure())
349 for testbed in self._testbeds.itervalues()])
352 self.do_netrefs(fail_if_undefined=True)
354 # perform do_configure in parallel for al testbeds
355 # (it's internal configuration for each)
356 self._parallel([testbed.do_configure
357 for testbed in self._testbeds.itervalues()])
359 # cross-connect (cannot be done in parallel)
360 for testbed in self._testbeds.values():
361 testbed.do_cross_connect()
363 # start experiment (parallel start on all testbeds)
364 self._parallel([testbed.start
365 for testbed in self._testbeds.itervalues()])
367 def _persist_testbed_proxies(self):
368 TRANSIENT = ('Recover',)
370 # persist access configuration for all testbeds, so that
371 # recovery mode can reconnect to them if it becomes necessary
372 conf = ConfigParser.RawConfigParser()
373 for testbed_guid, testbed_config in self._access_config.iteritems():
374 testbed_guid = str(testbed_guid)
375 conf.add_section(testbed_guid)
376 for attr in testbed_config.attributes_name:
377 if attr not in TRANSIENT:
378 conf.set(testbed_guid, attr,
379 testbed_config.get_attribute_value(attr))
381 f = open(os.path.join(self._root_dir, 'access_config.ini'), 'w')
385 def _load_testbed_proxies(self):
390 BOOLEAN : 'getboolean',
393 conf = ConfigParser.RawConfigParser()
394 conf.read(os.path.join(self._root_dir, 'access_config.ini'))
395 for testbed_guid in conf.sections():
396 testbed_config = proxy.AccessConfiguration()
397 for attr in conf.options(testbed_guid):
398 testbed_config.set_attribute_value(attr,
399 conf.get(testbed_guid, attr) )
401 testbed_guid = str(testbed_guid)
402 conf.add_section(testbed_guid)
403 for attr in testbed_config.attributes_name:
404 if attr not in TRANSIENT:
405 getter = getattr(conf, TYPEMAP.get(
406 testbed_config.get_attribute_type(attr),
408 testbed_config.set_attribute_value(
409 testbed_guid, attr, getter(attr))
411 def _unpersist_testbed_proxies(self):
413 os.remove(os.path.join(self._root_dir, 'access_config.ini'))
415 # Just print exceptions, this is just cleanup
417 traceback.print_exc(file=sys.stderr)
420 for testbed in self._testbeds.values():
422 self._unpersist_testbed_proxies()
425 # reload perviously persisted testbed access configurations
426 self._load_testbed_proxies()
428 # recreate testbed proxies by reconnecting only
429 self._init_testbed_controllers(recover = True)
431 def is_finished(self, guid):
432 for testbed in self._testbeds.values():
433 for guid_ in testbed.guids:
435 return testbed.status(guid) == STATUS_FINISHED
436 raise RuntimeError("No element exists with guid %d" % guid)
439 for testbed in self._testbeds.values():
443 def _netref_component_split(component):
444 match = COMPONENT_PATTERN.match(component)
446 return match.group("kind"), match.group("index")
448 return component, None
450 def do_netrefs(self, fail_if_undefined = False):
451 COMPONENT_GETTERS = {
453 lambda testbed, guid, index, name :
454 testbed.get_address(guid, index, name),
456 lambda testbed, guid, index, name :
457 testbed.get_route(guid, index, name),
459 lambda testbed, guid, index, name :
460 testbed.trace(guid, index, name),
462 lambda testbed, guid, index, name :
463 testbed.get(TIME_NOW, guid, name),
466 for (testbed_guid, guid), attrs in self._netrefs.iteritems():
467 testbed = self._testbeds[testbed_guid]
469 value = testbed.get(TIME_NOW, guid, name)
470 if isinstance(value, basestring):
471 match = ATTRIBUTE_PATTERN_BASE.search(value)
473 label = match.group("label")
474 if label.startswith('GUID-'):
475 ref_guid = int(label[5:])
477 expr = match.group("expr")
478 component = match.group("component")[1:] # skip the dot
479 attribute = match.group("attribute")
481 # split compound components into component kind and index
482 # eg: 'addr[0]' -> ('addr', '0')
483 component, component_index = self._netref_component_split(component)
485 # find object and resolve expression
486 for ref_testbed in self._testbeds.itervalues():
487 if component not in COMPONENT_GETTERS:
488 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
490 ref_value = COMPONENT_GETTERS[component](
491 ref_testbed, ref_guid, component_index, attribute)
493 testbed.set(TIME_NOW, guid, name,
494 value.replace(match.group(), ref_value))
497 # couldn't find value
498 if fail_if_undefined:
499 raise ValueError, "Unresolvable GUID: %r, in netref: %r" % (ref_guid, expr)
501 def _init_testbed_controllers(self, recover = False):
502 parser = XmlExperimentParser()
503 data = parser.from_xml_to_data(self._experiment_xml)
504 element_guids = list()
506 data_guids = data.guids
508 # create testbed controllers
509 for guid in data_guids:
510 if data.is_testbed_data(guid):
511 self._create_testbed_controller(guid, data, element_guids,
514 element_guids.append(guid)
515 label = data.get_attribute_data(guid, "label")
516 if label is not None:
517 if label in label_guids:
518 raise RuntimeError, "Label %r is not unique" % (label,)
519 label_guids[label] = guid
521 # replace references to elements labels for its guid
522 self._resolve_labels(data, data_guids, label_guids)
524 # program testbed controllers
526 self._program_testbed_controllers(element_guids, data)
528 def _resolve_labels(self, data, data_guids, label_guids):
529 netrefs = self._netrefs
530 for guid in data_guids:
531 if not data.is_testbed_data(guid):
532 for name, value in data.get_attribute_data(guid):
533 if isinstance(value, basestring):
534 match = ATTRIBUTE_PATTERN_BASE.search(value)
536 label = match.group("label")
537 if not label.startswith('GUID-'):
538 ref_guid = label_guids.get(label)
539 if ref_guid is not None:
540 value = ATTRIBUTE_PATTERN_BASE.sub(
541 ATTRIBUTE_PATTERN_GUID_SUB % dict(
542 guid = 'GUID-%d' % (ref_guid,),
543 expr = match.group("expr"),
546 data.set_attribute_data(guid, name, value)
548 # memorize which guid-attribute pairs require
549 # postprocessing, to avoid excessive controller-testbed
550 # communication at configuration time
551 # (which could require high-latency network I/O)
552 (testbed_guid, factory_id) = data.get_box_data(guid)
553 netrefs.setdefault((testbed_guid, guid), set()).add(name)
555 def _create_testbed_controller(self, guid, data, element_guids, recover):
556 (testbed_id, testbed_version) = data.get_testbed_data(guid)
557 access_config = None if guid not in self._access_config else\
558 self._access_config[guid]
560 if recover and access_config is None:
562 access_config = self._access_config[guid] = proxy.AccessConfiguration()
563 if access_config is not None:
564 # force recovery mode
565 access_config.set_attribute_value("recover",recover)
567 testbed = proxy.create_testbed_controller(testbed_id,
568 testbed_version, access_config)
569 for (name, value) in data.get_attribute_data(guid):
570 testbed.defer_configure(name, value)
571 self._testbeds[guid] = testbed
573 def _program_testbed_controllers(self, element_guids, data):
574 for guid in element_guids:
575 (testbed_guid, factory_id) = data.get_box_data(guid)
576 testbed = self._testbeds[testbed_guid]
577 testbed.defer_create(guid, factory_id)
578 for (name, value) in data.get_attribute_data(guid):
579 testbed.defer_create_set(guid, name, value)
581 for guid in element_guids:
582 (testbed_guid, factory_id) = data.get_box_data(guid)
583 testbed = self._testbeds[testbed_guid]
584 for (connector_type_name, other_guid, other_connector_type_name) \
585 in data.get_connection_data(guid):
586 (testbed_guid, factory_id) = data.get_box_data(guid)
587 (other_testbed_guid, other_factory_id) = data.get_box_data(
589 if testbed_guid == other_testbed_guid:
590 testbed.defer_connect(guid, connector_type_name, other_guid,
591 other_connector_type_name)
593 testbed.defer_cross_connect(guid, connector_type_name, other_guid,
594 other_testbed_id, other_factory_id, other_connector_type_name)
595 for trace_id in data.get_trace_data(guid):
596 testbed.defer_add_trace(guid, trace_id)
597 for (autoconf, address, netprefix, broadcast) in \
598 data.get_address_data(guid):
600 testbed.defer_add_address(guid, address, netprefix, broadcast)
601 for (destination, netprefix, nexthop) in data.get_route_data(guid):
602 testbed.defer_add_route(guid, destination, netprefix, nexthop)