2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import proxy, validation
6 from nepi.util.constants import STATUS_FINISHED, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
14 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
15 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
16 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
18 class ConnectorType(object):
19 def __init__(self, testbed_id, factory_id, name, max = -1, min = 0):
20 super(ConnectorType, self).__init__()
25 "The maximum number of connections allowed need to be more than 0")
28 "The minimum number of connections allowed needs to be at least 0")
29 # connector_type_id -- univoquely identifies a connector type
31 self._connector_type_id = (testbed_id.lower(), factory_id.lower(),
33 # name -- display name for the connector type
35 # max -- maximum amount of connections that this type support,
38 # min -- minimum amount of connections required by this type of connector
40 # from_connections -- connections where the other connector is the "From"
41 # to_connections -- connections where the other connector is the "To"
42 # keys in the dictionary correspond to the
43 # connector_type_id for possible connections. The value is a tuple:
44 # (can_cross, connect)
45 # can_cross: indicates if the connection is allowed accros different
47 # code: is the connection function to be invoked when the elements
49 self._from_connections = dict()
50 self._to_connections = dict()
53 def connector_type_id(self):
54 return self._connector_type_id
68 def add_from_connection(self, testbed_id, factory_id, name, can_cross, code):
69 self._from_connections[(testbed_id.lower(), factory_id.lower(),
70 name.lower())] = (can_cross, code)
72 def add_to_connection(self, testbed_id, factory_id, name, can_cross, code):
73 self._to_connections[(testbed_id.lower(), factory_id.lower(),
74 name.lower())] = (can_cross, code)
76 def can_connect(self, testbed_id, factory_id, name, count,
78 connector_type_id = (testbed_id.lower(), factory_id.lower(),
80 if connector_type_id in self._from_connections:
81 (can_cross, code) = self._from_connections[connector_type_id]
82 elif connector_type_id in self._to_connections:
83 (can_cross, code) = self._to_connections[connector_type_id]
86 return not must_cross or can_cross
88 def code_to_connect(self, testbed_id, factory_id, name):
89 connector_type_id = (testbed_id.lower(), factory_id.lower(),
91 if not connector_type_id in self._to_connections.keys():
93 (can_cross, code) = self._to_connections[connector_type_id]
96 # TODO: create_function, start_function, stop_function, status_function
98 class Factory(AttributesMap):
99 def __init__(self, factory_id, create_function, start_function,
100 stop_function, status_function, configure_function,
101 allow_addresses = False, allow_routes = False):
102 super(Factory, self).__init__()
103 self._factory_id = factory_id
104 self._allow_addresses = (allow_addresses == True)
105 self._allow_routes = (allow_routes == True)
106 self._create_function = create_function
107 self._start_function = start_function
108 self._stop_function = stop_function
109 self._status_function = status_function
110 self._configure_function = configure_function
111 self._connector_types = dict()
112 self._traces = list()
113 self._box_attributes = AttributesMap()
116 def factory_id(self):
117 return self._factory_id
120 def allow_addresses(self):
121 return self._allow_addresses
124 def allow_routes(self):
125 return self._allow_routes
128 def box_attributes(self):
129 return self._box_attributes
132 def create_function(self):
133 return self._create_function
136 def start_function(self):
137 return self._start_function
140 def stop_function(self):
141 return self._stop_function
144 def status_function(self):
145 return self._status_function
148 def configure_function(self):
149 return self._configure_function
155 def connector_type(self, name):
156 return self._connector_types[name]
158 def add_connector_type(self, connector_type):
159 self._connector_types[connector_type.name] = connector_type
161 def add_trace(self, trace_id):
162 self._traces.append(trace_id)
164 def add_box_attribute(self, name, help, type, value = None, range = None,
165 allowed = None, flags = Attribute.NoFlags, validation_function = None):
166 self._box_attributes.add_attribute(name, help, type, value, range,
167 allowed, flags, validation_function)
169 class TestbedController(object):
170 def __init__(self, testbed_id, testbed_version):
171 self._testbed_id = testbed_id
172 self._testbed_version = testbed_version
176 raise NotImplementedError
178 def defer_configure(self, name, value):
179 """Instructs setting a configuartion attribute for the testbed instance"""
180 raise NotImplementedError
182 def defer_create(self, guid, factory_id):
183 """Instructs creation of element """
184 raise NotImplementedError
186 def defer_create_set(self, guid, name, value):
187 """Instructs setting an initial attribute on an element"""
188 raise NotImplementedError
190 def defer_factory_set(self, guid, name, value):
191 """Instructs setting an attribute on a factory"""
192 raise NotImplementedError
194 def defer_connect(self, guid1, connector_type_name1, guid2,
195 connector_type_name2):
196 """Instructs creation of a connection between the given connectors"""
197 raise NotImplementedError
199 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
200 cross_testbed_id, cross_factory_id, cross_connector_type_name):
202 Instructs creation of a connection between the given connectors
203 of different testbed instances
205 raise NotImplementedError
207 def defer_add_trace(self, guid, trace_id):
208 """Instructs the addition of a trace"""
209 raise NotImplementedError
211 def defer_add_address(self, guid, address, netprefix, broadcast):
212 """Instructs the addition of an address"""
213 raise NotImplementedError
215 def defer_add_route(self, guid, destination, netprefix, nexthop):
216 """Instructs the addition of a route"""
217 raise NotImplementedError
220 """After do_setup the testbed initial configuration is done"""
221 raise NotImplementedError
225 After do_create all instructed elements are created and
228 raise NotImplementedError
230 def do_connect(self):
232 After do_connect all internal connections between testbed elements
235 raise NotImplementedError
237 def do_configure(self):
238 """After do_configure elements are configured"""
239 raise NotImplementedError
241 def do_cross_connect(self):
243 After do_cross_connect all external connections between different testbed
246 raise NotImplementedError
249 raise NotImplementedError
252 raise NotImplementedError
254 def set(self, time, guid, name, value):
255 raise NotImplementedError
257 def get(self, time, guid, name):
258 raise NotImplementedError
260 def get_route(self, guid, index, attribute):
264 guid: guid of box to query
265 index: number of routing entry to fetch
266 attribute: one of Destination, NextHop, NetPrefix
268 raise NotImplementedError
270 def get_address(self, guid, index, attribute='Address'):
274 guid: guid of box to query
275 index: number of inteface to select
276 attribute: one of Address, NetPrefix, Broadcast
278 raise NotImplementedError
280 def action(self, time, guid, action):
281 raise NotImplementedError
283 def status(self, guid):
284 raise NotImplementedError
286 def trace(self, guid, trace_id, attribute='value'):
287 raise NotImplementedError
290 raise NotImplementedError
292 class ExperimentController(object):
293 def __init__(self, experiment_xml, root_dir):
294 self._experiment_xml = experiment_xml
295 self._testbeds = dict()
296 self._access_config = dict()
297 self._netrefs = dict()
298 self._root_dir = root_dir
300 self.persist_experiment_xml()
303 def experiment_xml(self):
304 return self._experiment_xml
306 def persist_experiment_xml(self):
307 xml_path = os.path.join(self._root_dir, "experiment.xml")
308 f = open(xml_path, "w")
309 f.write(self._experiment_xml)
312 def set_access_configuration(self, testbed_guid, access_config):
313 self._access_config[testbed_guid] = access_config
315 def trace(self, testbed_guid, guid, trace_id, attribute='value'):
316 return self._testbeds[testbed_guid].trace(guid, trace_id, attribute)
319 def _parallel(callables):
320 threads = [ threading.Thread(target=callable) for callable in callables ]
321 for thread in threads:
323 for thread in threads:
327 self._create_testbed_instances()
329 # persist testbed connection data, for potential recovery
330 self._persist_testbed_proxies()
332 # perform setup in parallel for all test beds,
333 # wait for all threads to finish
334 self._parallel([testbed.do_setup
335 for testbed in self._testbeds.itervalues()])
337 # perform create-connect in parallel, wait
338 # (internal connections only)
339 self._parallel([lambda : (testbed.do_create(),
340 testbed.do_connect())
341 for testbed in self._testbeds.itervalues()])
344 self.do_netrefs(fail_if_undefined=True)
346 # perform do_configure in parallel for al testbeds
347 # (it's internal configuration for each)
348 self._parallel([testbed.do_configure
349 for testbed in self._testbeds.itervalues()])
351 # cross-connect (cannot be done in parallel)
352 for testbed in self._testbeds.values():
353 testbed.do_cross_connect()
355 # start experiment (parallel start on all testbeds)
356 self._parallel([testbed.start
357 for testbed in self._testbeds.itervalues()])
359 def _persist_testbed_proxies(self):
360 TRANSIENT = ('Recover',)
362 # persist access configuration for all testbeds, so that
363 # recovery mode can reconnect to them if it becomes necessary
364 conf = ConfigParser.RawConfigParser()
365 for testbed_guid, testbed_config in self._access_config.iteritems():
366 testbed_guid = str(testbed_guid)
367 conf.add_section(testbed_guid)
368 for attr in testbed_config.attributes_name:
369 if attr not in TRANSIENT:
370 conf.set(testbed_guid, attr,
371 testbed_config.get_attribute_value(attr))
373 f = open(os.path.join(self._root_dir, 'access_config.ini'), 'w')
377 def _load_testbed_proxies(self):
382 BOOLEAN : 'getboolean',
385 conf = ConfigParser.RawConfigParser()
386 conf.read(os.path.join(self._root_dir, 'access_config.ini'))
387 for testbed_guid in conf.sections():
388 testbed_config = proxy.AccessConfiguration()
389 for attr in conf.options(testbed_guid):
390 testbed_config.set_attribute_value(attr,
391 conf.get(testbed_guid, attr) )
393 testbed_guid = str(testbed_guid)
394 conf.add_section(testbed_guid)
395 for attr in testbed_config.attributes_name:
396 if attr not in TRANSIENT:
397 getter = getattr(conf, TYPEMAP.get(
398 testbed_config.get_attribute_type(attr),
400 testbed_config.set_attribute_value(
401 testbed_guid, attr, getter(attr))
403 def _unpersist_testbed_proxies(self):
405 os.remove(os.path.join(self._root_dir, 'access_config.ini'))
407 # Just print exceptions, this is just cleanup
409 traceback.print_exc(file=sys.stderr)
412 for testbed in self._testbeds.values():
414 self._unpersist_testbed_proxies()
417 # reload perviously persisted testbed access configurations
418 self._load_testbed_proxies()
420 # recreate testbed proxies by reconnecting only
421 self._create_testbed_instances(recover=True)
423 def is_finished(self, guid):
424 for testbed in self._testbeds.values():
425 for guid_ in testbed.guids:
427 return testbed.status(guid) == STATUS_FINISHED
428 raise RuntimeError("No element exists with guid %d" % guid)
431 for testbed in self._testbeds.values():
435 def _netref_component_split(component):
436 match = COMPONENT_PATTERN.match(component)
438 return match.group("kind"), match.group("index")
440 return component, None
442 def do_netrefs(self, fail_if_undefined = False):
443 COMPONENT_GETTERS = {
445 lambda testbed, guid, index, name :
446 testbed.get_address(guid, index, name),
448 lambda testbed, guid, index, name :
449 testbed.get_route(guid, index, name),
451 lambda testbed, guid, index, name :
452 testbed.trace(guid, index, name),
454 lambda testbed, guid, index, name :
455 testbed.get(TIME_NOW, guid, name),
458 for (testbed_guid, guid), attrs in self._netrefs.iteritems():
459 testbed = self._testbeds[testbed_guid]
461 value = testbed.get(TIME_NOW, guid, name)
462 if isinstance(value, basestring):
463 match = ATTRIBUTE_PATTERN_BASE.search(value)
465 label = match.group("label")
466 if label.startswith('GUID-'):
467 ref_guid = int(label[5:])
469 expr = match.group("expr")
470 component = match.group("component")[1:] # skip the dot
471 attribute = match.group("attribute")
473 # split compound components into component kind and index
474 # eg: 'addr[0]' -> ('addr', '0')
475 component, component_index = self._netref_component_split(component)
477 # find object and resolve expression
478 for ref_testbed in self._testbeds.itervalues():
479 if component not in COMPONENT_GETTERS:
480 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
482 value = COMPONENT_GETTERS[component](
483 ref_testbed, ref_guid, component_index, attribute)
487 # couldn't find value
488 if fail_if_undefined:
489 raise ValueError, "Unresolvable GUID: %r, in netref: %r" % (ref_guid, expr)
491 def _create_testbed_instances(self, recover = False):
492 parser = XmlExperimentParser()
493 data = parser.from_xml_to_data(self._experiment_xml)
494 element_guids = list()
496 data_guids = data.guids
497 netrefs = self._netrefs
498 for guid in data_guids:
499 if data.is_testbed_data(guid):
500 (testbed_id, testbed_version) = data.get_testbed_data(guid)
501 access_config = None if guid not in self._access_config else\
502 self._access_config[guid]
504 if recover and access_config is None:
506 access_config = self._access_config[guid] = proxy.AccessConfiguration()
507 if access_config is not None:
508 # force recovery mode
509 access_config.set_attribute_value("recover",recover)
511 testbed = proxy.create_testbed_instance(testbed_id,
512 testbed_version, access_config)
513 for (name, value) in data.get_attribute_data(guid):
514 testbed.defer_configure(name, value)
515 self._testbeds[guid] = testbed
517 element_guids.append(guid)
518 label = data.get_attribute_data(guid, "label")
519 if label is not None:
520 if label in label_guids:
521 raise RuntimeError, "Label %r is not unique" % (label,)
522 label_guids[label] = guid
523 for guid in data_guids:
524 if not data.is_testbed_data(guid):
525 for name, value in data.get_attribute_data(guid):
526 if isinstance(value, basestring):
527 match = ATTRIBUTE_PATTERN_BASE.search(value)
529 label = match.group("label")
530 if not label.startswith('GUID-'):
531 ref_guid = label_guids.get(label)
532 if ref_guid is not None:
533 value = ATTRIBUTE_PATTERN_BASE.sub(
534 ATTRIBUTE_PATTERN_GUID_SUB % dict(
535 guid='GUID-%d' % (ref_guid,),
536 expr=match.group("expr"),
539 data.set_attribute_data(guid, name, value)
541 # memorize which guid-attribute pairs require
542 # postprocessing, to avoid excessive controller-testbed
543 # communication at configuration time
544 # (which could require high-latency network I/O)
545 (testbed_guid, factory_id) = data.get_box_data(guid)
546 netrefs.setdefault((testbed_guid,guid),set()).add(name)
548 self._program_testbed_instances(element_guids, data)
550 def _program_testbed_instances(self, element_guids, data):
551 for guid in element_guids:
552 (testbed_guid, factory_id) = data.get_box_data(guid)
553 testbed = self._testbeds[testbed_guid]
554 testbed.defer_create(guid, factory_id)
555 for (name, value) in data.get_attribute_data(guid):
556 testbed.defer_create_set(guid, name, value)
558 for guid in element_guids:
559 (testbed_guid, factory_id) = data.get_box_data(guid)
560 testbed = self._testbeds[testbed_guid]
561 for (connector_type_name, other_guid, other_connector_type_name) \
562 in data.get_connection_data(guid):
563 (testbed_guid, factory_id) = data.get_box_data(guid)
564 (other_testbed_guid, other_factory_id) = data.get_box_data(
566 if testbed_guid == other_testbed_guid:
567 testbed.defer_connect(guid, connector_type_name, other_guid,
568 other_connector_type_name)
570 testbed.defer_cross_connect(guid, connector_type_name, other_guid,
571 other_testbed_id, other_factory_id, other_connector_type_name)
572 for trace_id in data.get_trace_data(guid):
573 testbed.defer_add_trace(guid, trace_id)
574 for (autoconf, address, netprefix, broadcast) in \
575 data.get_address_data(guid):
577 testbed.defer_add_address(guid, address, netprefix, broadcast)
578 for (destination, netprefix, nexthop) in data.get_route_data(guid):
579 testbed.defer_add_route(guid, destination, netprefix, nexthop)