2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import proxy, validation
6 from nepi.util.constants import STATUS_FINISHED, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
12 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
13 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
14 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
16 class ConnectorType(object):
17 def __init__(self, testbed_id, factory_id, name, max = -1, min = 0):
18 super(ConnectorType, self).__init__()
23 "The maximum number of connections allowed need to be more than 0")
26 "The minimum number of connections allowed needs to be at least 0")
27 # connector_type_id -- univoquely identifies a connector type
29 self._connector_type_id = (testbed_id.lower(), factory_id.lower(),
31 # name -- display name for the connector type
33 # max -- maximum amount of connections that this type support,
36 # min -- minimum amount of connections required by this type of connector
38 # from_connections -- connections where the other connector is the "From"
39 # to_connections -- connections where the other connector is the "To"
40 # keys in the dictionary correspond to the
41 # connector_type_id for possible connections. The value is a tuple:
42 # (can_cross, connect)
43 # can_cross: indicates if the connection is allowed accros different
45 # code: is the connection function to be invoked when the elements
47 self._from_connections = dict()
48 self._to_connections = dict()
51 def connector_type_id(self):
52 return self._connector_type_id
66 def add_from_connection(self, testbed_id, factory_id, name, can_cross, code):
67 self._from_connections[(testbed_id.lower(), factory_id.lower(),
68 name.lower())] = (can_cross, code)
70 def add_to_connection(self, testbed_id, factory_id, name, can_cross, code):
71 self._to_connections[(testbed_id.lower(), factory_id.lower(),
72 name.lower())] = (can_cross, code)
74 def can_connect(self, testbed_id, factory_id, name, count,
76 connector_type_id = (testbed_id.lower(), factory_id.lower(),
78 if connector_type_id in self._from_connections:
79 (can_cross, code) = self._from_connections[connector_type_id]
80 elif connector_type_id in self._to_connections:
81 (can_cross, code) = self._to_connections[connector_type_id]
84 return not must_cross or can_cross
86 def code_to_connect(self, testbed_id, factory_id, name):
87 connector_type_id = (testbed_id.lower(), factory_id.lower(),
89 if not connector_type_id in self._to_connections.keys():
91 (can_cross, code) = self._to_connections[connector_type_id]
94 # TODO: create_function, start_function, stop_function, status_function
96 class Factory(AttributesMap):
97 def __init__(self, factory_id, create_function, start_function,
98 stop_function, status_function, configure_function,
99 allow_addresses = False, allow_routes = False):
100 super(Factory, self).__init__()
101 self._factory_id = factory_id
102 self._allow_addresses = (allow_addresses == True)
103 self._allow_routes = (allow_routes == True)
104 self._create_function = create_function
105 self._start_function = start_function
106 self._stop_function = stop_function
107 self._status_function = status_function
108 self._configure_function = configure_function
109 self._connector_types = dict()
110 self._traces = list()
111 self._box_attributes = AttributesMap()
114 def factory_id(self):
115 return self._factory_id
118 def allow_addresses(self):
119 return self._allow_addresses
122 def allow_routes(self):
123 return self._allow_routes
126 def box_attributes(self):
127 return self._box_attributes
130 def create_function(self):
131 return self._create_function
134 def start_function(self):
135 return self._start_function
138 def stop_function(self):
139 return self._stop_function
142 def status_function(self):
143 return self._status_function
146 def configure_function(self):
147 return self._configure_function
153 def connector_type(self, name):
154 return self._connector_types[name]
156 def add_connector_type(self, connector_type):
157 self._connector_types[connector_type.name] = connector_type
159 def add_trace(self, trace_id):
160 self._traces.append(trace_id)
162 def add_box_attribute(self, name, help, type, value = None, range = None,
163 allowed = None, flags = Attribute.NoFlags, validation_function = None):
164 self._box_attributes.add_attribute(name, help, type, value, range,
165 allowed, flags, validation_function)
167 class TestbedInstance(object):
168 def __init__(self, testbed_id, testbed_version):
169 self._testbed_id = testbed_id
170 self._testbed_version = testbed_version
174 raise NotImplementedError
176 def defer_configure(self, name, value):
177 """Instructs setting a configuartion attribute for the testbed instance"""
178 raise NotImplementedError
180 def defer_create(self, guid, factory_id):
181 """Instructs creation of element """
182 raise NotImplementedError
184 def defer_create_set(self, guid, name, value):
185 """Instructs setting an initial attribute on an element"""
186 raise NotImplementedError
188 def defer_factory_set(self, guid, name, value):
189 """Instructs setting an attribute on a factory"""
190 raise NotImplementedError
192 def defer_connect(self, guid1, connector_type_name1, guid2,
193 connector_type_name2):
194 """Instructs creation of a connection between the given connectors"""
195 raise NotImplementedError
197 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
198 cross_testbed_id, cross_factory_id, cross_connector_type_name):
200 Instructs creation of a connection between the given connectors
201 of different testbed instances
203 raise NotImplementedError
205 def defer_add_trace(self, guid, trace_id):
206 """Instructs the addition of a trace"""
207 raise NotImplementedError
209 def defer_add_address(self, guid, address, netprefix, broadcast):
210 """Instructs the addition of an address"""
211 raise NotImplementedError
213 def defer_add_route(self, guid, destination, netprefix, nexthop):
214 """Instructs the addition of a route"""
215 raise NotImplementedError
218 """After do_setup the testbed initial configuration is done"""
219 raise NotImplementedError
223 After do_create all instructed elements are created and
226 raise NotImplementedError
228 def do_connect(self):
230 After do_connect all internal connections between testbed elements
233 raise NotImplementedError
235 def do_configure(self):
236 """After do_configure elements are configured"""
237 raise NotImplementedError
239 def do_cross_connect(self):
241 After do_cross_connect all external connections between different testbed
244 raise NotImplementedError
247 raise NotImplementedError
250 raise NotImplementedError
252 def set(self, time, guid, name, value):
253 raise NotImplementedError
255 def get(self, time, guid, name):
256 raise NotImplementedError
258 def get_route(self, guid, index, attribute):
262 guid: guid of box to query
263 index: number of routing entry to fetch
264 attribute: one of Destination, NextHop, NetPrefix
266 raise NotImplementedError
268 def get_address(self, guid, index, attribute='Address'):
272 guid: guid of box to query
273 index: number of inteface to select
274 attribute: one of Address, NetPrefix, Broadcast
276 raise NotImplementedError
278 def action(self, time, guid, action):
279 raise NotImplementedError
281 def status(self, guid):
282 raise NotImplementedError
284 def trace(self, guid, trace_id, attribute='value'):
285 raise NotImplementedError
288 raise NotImplementedError
290 class ExperimentController(object):
291 def __init__(self, experiment_xml):
292 self._experiment_xml = experiment_xml
293 self._testbeds = dict()
294 self._access_config = dict()
295 self._netrefs = dict()
298 def experiment_xml(self):
299 return self._experiment_xml
301 def set_access_configuration(self, testbed_guid, access_config):
302 self._access_config[testbed_guid] = access_config
304 def trace(self, testbed_guid, guid, trace_id):
305 return self._testbeds[testbed_guid].trace(guid, trace_id)
308 def _parallel(callables):
309 threads = [ threading.Thread(target=callable) for callable in callables ]
310 for thread in threads:
312 for thread in threads:
316 self._create_testbed_instances()
318 # perform setup in parallel for all test beds,
319 # wait for all threads to finish
320 self._parallel([testbed.do_setup
321 for testbed in self._testbeds.itervalues()])
323 # perform create-connect in parallel, wait
324 # (internal connections only)
325 self._parallel([lambda : (testbed.do_create(),
326 testbed.do_connect())
327 for testbed in self._testbeds.itervalues()])
330 self.do_netrefs(fail_if_undefined=True)
332 # perform do_configure in parallel for al testbeds
333 # (it's internal configuration for each)
334 self._parallel([testbed.do_configure
335 for testbed in self._testbeds.itervalues()])
337 # cross-connect (cannot be done in parallel)
338 for testbed in self._testbeds.values():
339 testbed.do_cross_connect()
341 # start experiment (parallel start on all testbeds)
342 self._parallel([testbed.start
343 for testbed in self._testbeds.itervalues()])
346 for testbed in self._testbeds.values():
349 def is_finished(self, guid):
350 for testbed in self._testbeds.values():
351 for guid_ in testbed.guids:
353 return testbed.status(guid) == STATUS_FINISHED
354 raise RuntimeError("No element exists with guid %d" % guid)
357 for testbed in self._testbeds.values():
361 def _netref_component_split(component):
362 match = COMPONENT_PATTERN.match(component)
364 return match.group("kind"), match.group("index")
366 return component, None
368 def do_netrefs(self, fail_if_undefined = False):
369 COMPONENT_GETTERS = {
371 lambda testbed, guid, index, name :
372 testbed.get_address(guid, index, name),
374 lambda testbed, guid, index, name :
375 testbed.get_route(guid, index, name),
377 lambda testbed, guid, index, name :
378 testbed.trace(guid, index, name),
380 lambda testbed, guid, index, name :
381 testbed.get(TIME_NOW, guid, name),
384 for (testbed_guid, guid), attrs in self._netrefs.iteritems():
385 testbed = self._testbeds[testbed_guid]
387 value = testbed.get(TIME_NOW, guid, name)
388 if isinstance(value, basestring):
389 match = ATTRIBUTE_PATTERN_BASE.search(value)
391 label = match.group("label")
392 if label.startswith('GUID-'):
393 ref_guid = int(label[5:])
395 expr = match.group("expr")
396 component = match.group("component")[1:] # skip the dot
397 attribute = match.group("attribute")
399 # split compound components into component kind and index
400 # eg: 'addr[0]' -> ('addr', '0')
401 component, component_index = self._netref_component_split(component)
403 # find object and resolve expression
404 for ref_testbed in self._testbeds.itervalues():
405 if component not in COMPONENT_GETTERS:
406 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
408 value = COMPONENT_GETTERS[component](
409 ref_testbed, ref_guid, component_index, attribute)
413 # couldn't find value
414 if fail_if_undefined:
415 raise ValueError, "Unresolvable GUID: %r, in netref: %r" % (ref_guid, expr)
417 def _create_testbed_instances(self):
418 parser = XmlExperimentParser()
419 data = parser.from_xml_to_data(self._experiment_xml)
420 element_guids = list()
422 data_guids = data.guids
423 netrefs = self._netrefs
424 for guid in data_guids:
425 if data.is_testbed_data(guid):
426 (testbed_id, testbed_version) = data.get_testbed_data(guid)
427 access_config = None if guid not in self._access_config else\
428 self._access_config[guid]
429 testbed = proxy.create_testbed_instance(testbed_id,
430 testbed_version, access_config)
431 for (name, value) in data.get_attribute_data(guid):
432 testbed.defer_configure(name, value)
433 self._testbeds[guid] = testbed
435 element_guids.append(guid)
436 label = data.get_attribute_data(guid, "label")
437 if label is not None:
438 if label in label_guids:
439 raise RuntimeError, "Label %r is not unique" % (label,)
440 label_guids[label] = guid
441 for guid in data_guids:
442 if not data.is_testbed_data(guid):
443 for name, value in data.get_attribute_data(guid):
444 if isinstance(value, basestring):
445 match = ATTRIBUTE_PATTERN_BASE.search(value)
447 label = match.group("label")
448 if not label.startswith('GUID-'):
449 ref_guid = label_guids.get(label)
450 if ref_guid is not None:
451 value = ATTRIBUTE_PATTERN_BASE.sub(
452 ATTRIBUTE_PATTERN_GUID_SUB % dict(
453 guid='GUID-%d' % (ref_guid,),
454 expr=match.group("expr"),
457 data.set_attribute_data(guid, name, value)
459 # memorize which guid-attribute pairs require
460 # postprocessing, to avoid excessive controller-testbed
461 # communication at configuration time
462 # (which could require high-latency network I/O)
463 (testbed_guid, factory_id) = data.get_box_data(guid)
464 netrefs.setdefault((testbed_guid,guid),set()).add(name)
465 self._program_testbed_instances(element_guids, data)
467 def _program_testbed_instances(self, element_guids, data):
468 for guid in element_guids:
469 (testbed_guid, factory_id) = data.get_box_data(guid)
470 testbed = self._testbeds[testbed_guid]
471 testbed.defer_create(guid, factory_id)
472 for (name, value) in data.get_attribute_data(guid):
473 testbed.defer_create_set(guid, name, value)
475 for guid in element_guids:
476 (testbed_guid, factory_id) = data.get_box_data(guid)
477 testbed = self._testbeds[testbed_guid]
478 for (connector_type_name, other_guid, other_connector_type_name) \
479 in data.get_connection_data(guid):
480 (testbed_guid, factory_id) = data.get_box_data(guid)
481 (other_testbed_guid, other_factory_id) = data.get_box_data(
483 if testbed_guid == other_testbed_guid:
484 testbed.defer_connect(guid, connector_type_name, other_guid,
485 other_connector_type_name)
487 testbed.defer_cross_connect(guid, connector_type_name, other_guid,
488 other_testbed_id, other_factory_id, other_connector_type_name)
489 for trace_id in data.get_trace_data(guid):
490 testbed.defer_add_trace(guid, trace_id)
491 for (autoconf, address, netprefix, broadcast) in \
492 data.get_address_data(guid):
494 testbed.defer_add_address(guid, address, netprefix, broadcast)
495 for (destination, netprefix, nexthop) in data.get_route_data(guid):
496 testbed.defer_add_route(guid, destination, netprefix, nexthop)