2 # -*- coding: utf-8 -*-
4 from nepi.core import execute
5 from nepi.core.metadata import Metadata, Parallel
6 from nepi.util import validation
7 from nepi.util.constants import TIME_NOW, \
8 ApplicationStatus as AS, \
11 from nepi.util.parallel import ParallelRun
17 class TestbedController(execute.TestbedController):
18 def __init__(self, testbed_id, testbed_version):
19 super(TestbedController, self).__init__(testbed_id, testbed_version)
20 self._status = TS.STATUS_ZERO
21 # testbed attributes for validation
22 self._attributes = None
23 # element factories for validation
24 self._factories = dict()
26 # experiment construction instructions
28 self._create_set = dict()
29 self._factory_set = dict()
30 self._connect = dict()
31 self._cross_connect = dict()
32 self._add_trace = dict()
33 self._add_address = dict()
34 self._add_route = dict()
35 self._configure = dict()
37 # log of set operations
42 # testbed element instances
43 self._elements = dict()
45 self._metadata = Metadata(self._testbed_id)
46 if self._metadata.testbed_version != testbed_version:
47 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
48 (testbed_id, testbed_version, self._metadata.testbed_version))
49 for factory in self._metadata.build_factories():
50 self._factories[factory.factory_id] = factory
51 self._attributes = self._metadata.testbed_attributes()
52 self._root_directory = None
55 self._logger = logging.getLogger("nepi.core.testbed_impl")
58 def root_directory(self):
59 return self._root_directory
63 return self._create.keys()
69 def defer_configure(self, name, value):
70 self._validate_testbed_attribute(name)
71 self._validate_testbed_value(name, value)
72 self._attributes.set_attribute_value(name, value)
73 self._configure[name] = value
75 def defer_create(self, guid, factory_id):
76 self._validate_factory_id(factory_id)
77 self._validate_not_guid(guid)
78 self._create[guid] = factory_id
80 def defer_create_set(self, guid, name, value):
81 self._validate_guid(guid)
82 self._validate_box_attribute(guid, name)
83 self._validate_box_value(guid, name, value)
84 if guid not in self._create_set:
85 self._create_set[guid] = dict()
86 self._create_set[guid][name] = value
88 def defer_factory_set(self, guid, name, value):
89 self._validate_guid(guid)
90 self._validate_factory_attribute(guid, name)
91 self._validate_factory_value(guid, name, value)
92 if guid not in self._factory_set:
93 self._factory_set[guid] = dict()
94 self._factory_set[guid][name] = value
96 def defer_connect(self, guid1, connector_type_name1, guid2,
97 connector_type_name2):
98 self._validate_guid(guid1)
99 self._validate_guid(guid2)
100 factory1 = self._get_factory(guid1)
101 factory_id2 = self._create[guid2]
102 connector_type = factory1.connector_type(connector_type_name1)
103 connector_type.can_connect(self._testbed_id, factory_id2,
104 connector_type_name2, False)
105 self._validate_connection(guid1, connector_type_name1, guid2,
106 connector_type_name2)
108 if not guid1 in self._connect:
109 self._connect[guid1] = dict()
110 if not connector_type_name1 in self._connect[guid1]:
111 self._connect[guid1][connector_type_name1] = dict()
112 self._connect[guid1][connector_type_name1][guid2] = \
114 if not guid2 in self._connect:
115 self._connect[guid2] = dict()
116 if not connector_type_name2 in self._connect[guid2]:
117 self._connect[guid2][connector_type_name2] = dict()
118 self._connect[guid2][connector_type_name2][guid1] = \
121 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
122 cross_testbed_guid, cross_testbed_id, cross_factory_id,
123 cross_connector_type_name):
124 self._validate_guid(guid)
125 factory = self._get_factory(guid)
126 connector_type = factory.connector_type(connector_type_name)
127 connector_type.can_connect(cross_testbed_id, cross_factory_id,
128 cross_connector_type_name, True)
129 self._validate_connection(guid, connector_type_name, cross_guid,
130 cross_connector_type_name)
132 if not guid in self._cross_connect:
133 self._cross_connect[guid] = dict()
134 if not connector_type_name in self._cross_connect[guid]:
135 self._cross_connect[guid][connector_type_name] = dict()
136 self._cross_connect[guid][connector_type_name] = \
137 (cross_guid, cross_testbed_guid, cross_testbed_id,
138 cross_factory_id, cross_connector_type_name)
140 def defer_add_trace(self, guid, trace_name):
141 self._validate_guid(guid)
142 self._validate_trace(guid, trace_name)
143 if not guid in self._add_trace:
144 self._add_trace[guid] = list()
145 self._add_trace[guid].append(trace_name)
147 def defer_add_address(self, guid, address, netprefix, broadcast):
148 self._validate_guid(guid)
149 self._validate_allow_addresses(guid)
150 if guid not in self._add_address:
151 self._add_address[guid] = list()
152 self._add_address[guid].append((address, netprefix, broadcast))
154 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
155 self._validate_guid(guid)
156 self._validate_allow_routes(guid)
157 if not guid in self._add_route:
158 self._add_route[guid] = list()
159 self._add_route[guid].append((destination, netprefix, nexthop, metric))
162 self._root_directory = self._attributes.\
163 get_attribute_value("rootDirectory")
164 self._status = TS.STATUS_SETUP
167 def set_params(self, guid):
168 parameters = self._get_parameters(guid)
169 for name, value in parameters.iteritems():
170 self.set(guid, name, value)
172 self._do_in_factory_order(
174 self._metadata.create_order,
175 postaction = set_params )
176 self._status = TS.STATUS_CREATED
178 def _do_connect(self, init = True):
179 unconnected = copy.deepcopy(self._connect)
182 for guid1, connections in unconnected.items():
183 factory1 = self._get_factory(guid1)
184 for connector_type_name1, connections2 in connections.items():
185 connector_type1 = factory1.connector_type(connector_type_name1)
186 for guid2, connector_type_name2 in connections2.items():
187 factory_id2 = self._create[guid2]
188 # Connections are executed in a "From -> To" direction only
189 # This explicitly ignores the "To -> From" (mirror)
190 # connections of every connection pair.
192 connect_code = connector_type1.connect_to_init_code(
193 self._testbed_id, factory_id2,
194 connector_type_name2,
197 connect_code = connector_type1.connect_to_compl_code(
198 self._testbed_id, factory_id2,
199 connector_type_name2,
203 delay = connect_code(self, guid1, guid2)
205 if delay is not CONNECTION_DELAY:
206 del unconnected[guid1][connector_type_name1][guid2]
207 if not unconnected[guid1][connector_type_name1]:
208 del unconnected[guid1][connector_type_name1]
209 if not unconnected[guid1]:
210 del unconnected[guid1]
212 def do_connect_init(self):
215 def do_connect_compl(self):
216 self._do_connect(init = False)
217 self._status = TS.STATUS_CONNECTED
219 def _do_in_factory_order(self, action, order, postaction = None, poststep = None):
220 logger = self._logger
222 guids = collections.defaultdict(list)
223 # order guids (elements) according to factory_id
224 for guid, factory_id in self._create.iteritems():
225 guids[factory_id].append(guid)
227 # configure elements following the factory_id order
228 for factory_id in order:
229 # Create a parallel runner if we're given a Parallel() wrapper
231 if isinstance(factory_id, Parallel):
232 runner = ParallelRun(factory_id.maxthreads)
233 factory_id = factory_id.factory
235 # omit the factories that have no element to create
236 if factory_id not in guids:
240 factory = self._factories[factory_id]
241 if isinstance(action, basestring) and not getattr(factory, action):
243 def perform_action(guid):
244 if isinstance(action, basestring):
245 getattr(factory, action)(self, guid)
249 postaction(self, guid)
251 # perform the action on all elements, in parallel if so requested
253 logger.debug("Starting parallel %s", action)
256 for guid in guids[factory_id]:
258 logger.debug("Scheduling %s on %s", action, guid)
259 runner.put(perform_action, guid)
261 logger.debug("Performing %s on %s", action, guid)
270 for guid in guids[factory_id]:
272 logger.debug("Scheduling post-%s on %s", action, guid)
273 runner.put(poststep, self, guid)
275 logger.debug("Performing post-%s on %s", action, guid)
281 logger.debug("Finished parallel %s", action)
284 def do_poststep_preconfigure(self, guid):
285 # dummy hook for implementations interested in
286 # two-phase configuration
289 def do_preconfigure(self):
290 self._do_in_factory_order(
291 'preconfigure_function',
292 self._metadata.preconfigure_order,
293 poststep = self.do_poststep_preconfigure )
296 def do_poststep_configure(self, guid):
297 # dummy hook for implementations interested in
298 # two-phase configuration
301 def do_configure(self):
302 self._do_in_factory_order(
303 'configure_function',
304 self._metadata.configure_order,
305 poststep = self.do_poststep_configure )
306 self._status = TS.STATUS_CONFIGURED
308 def do_prestart(self):
309 self._do_in_factory_order(
311 self._metadata.prestart_order )
313 def _do_cross_connect(self, cross_data, init = True):
314 for guid, cross_connections in self._cross_connect.iteritems():
315 factory = self._get_factory(guid)
316 for connector_type_name, cross_connection in \
317 cross_connections.iteritems():
318 connector_type = factory.connector_type(connector_type_name)
319 (cross_guid, cross_testbed_guid, cross_testbed_id,
320 cross_factory_id, cross_connector_type_name) = cross_connection
322 connect_code = connector_type.connect_to_init_code(
323 cross_testbed_id, cross_factory_id,
324 cross_connector_type_name,
327 connect_code = connector_type.connect_to_compl_code(
328 cross_testbed_id, cross_factory_id,
329 cross_connector_type_name,
332 elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
333 connect_code(self, guid, elem_cross_data)
335 def do_cross_connect_init(self, cross_data):
336 self._do_cross_connect(cross_data)
338 def do_cross_connect_compl(self, cross_data):
339 self._do_cross_connect(cross_data, init = False)
340 self._status = TS.STATUS_CROSS_CONNECTED
342 def set(self, guid, name, value, time = TIME_NOW):
343 self._validate_guid(guid)
344 self._validate_box_attribute(guid, name)
345 self._validate_box_value(guid, name, value)
346 self._validate_modify_box_value(guid, name)
347 if guid not in self._set:
348 self._set[guid] = dict()
349 self._setlog[guid] = dict()
350 if time not in self._setlog[guid]:
351 self._setlog[guid][time] = dict()
352 self._setlog[guid][time][name] = value
353 self._set[guid][name] = value
355 def get(self, guid, name, time = TIME_NOW):
357 gets an attribute from box definitions if available.
358 Throws KeyError if the GUID wasn't created
359 through the defer_create interface, and AttributeError if the
360 attribute isn't available (doesn't exist or is design-only)
362 self._validate_guid(guid)
363 self._validate_box_attribute(guid, name)
364 if guid in self._set and name in self._set[guid]:
365 return self._set[guid][name]
366 if guid in self._create_set and name in self._create_set[guid]:
367 return self._create_set[guid][name]
368 # if nothing else found, returns the factory default value
369 factory = self._get_factory(guid)
370 return factory.box_attributes.get_attribute_value(name)
372 def get_route(self, guid, index, attribute):
374 returns information given to defer_add_route.
376 Raises AttributeError if an invalid attribute is requested
377 or if the indexed routing rule does not exist.
379 Raises KeyError if the GUID has not been seen by
382 ATTRIBUTES = ['Destination', 'NetPrefix', 'NextHop']
384 if attribute not in ATTRIBUTES:
385 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
387 attribute_index = ATTRIBUTES.index(attribute)
389 routes = self._add_route.get(guid)
391 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
394 if not (0 <= index < len(addresses)):
395 raise AttributeError, "GUID %r at %s does not have a routing entry #%s" % (
396 guid, self._testbed_id, index)
398 return routes[index][attribute_index]
400 def get_address(self, guid, index, attribute='Address'):
402 returns information given to defer_add_address
404 Raises AttributeError if an invalid attribute is requested
405 or if the indexed routing rule does not exist.
407 Raises KeyError if the GUID has not been seen by
410 ATTRIBUTES = ['Address', 'NetPrefix', 'Broadcast']
412 if attribute not in ATTRIBUTES:
413 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
415 attribute_index = ATTRIBUTES.index(attribute)
417 addresses = self._add_address.get(guid)
419 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
422 if not (0 <= index < len(addresses)):
423 raise AttributeError, "GUID %r at %s does not have an address #%s" % (
424 guid, self._testbed_id, index)
426 return addresses[index][attribute_index]
428 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
429 factory = self._get_factory(guid)
430 attribute_list = list()
431 return factory.box_attributes.get_attribute_list(filter_flags, exclude)
433 def get_factory_id(self, guid):
434 factory = self._get_factory(guid)
435 return factory.factory_id
437 def start(self, time = TIME_NOW):
438 self._do_in_factory_order(
440 self._metadata.start_order )
441 self._status = TS.STATUS_STARTED
443 #action: NotImplementedError
445 def stop(self, time = TIME_NOW):
446 self._do_in_factory_order(
448 reversed(self._metadata.start_order) )
449 self._status = TS.STATUS_STOPPED
451 def status(self, guid = None):
454 self._validate_guid(guid)
455 factory = self._get_factory(guid)
456 status_function = factory.status_function
458 return status_function(self, guid)
459 return AS.STATUS_UNDETERMINED
461 def trace(self, guid, trace_id, attribute='value'):
462 if attribute == 'value':
463 fd = open("%s" % self.trace_filepath(guid, trace_id), "r")
466 elif attribute == 'path':
467 content = self.trace_filepath(guid, trace_id)
472 def traces_info(self):
474 host = self._attributes.get_attribute_value("deployment_host")
475 user = self._attributes.get_attribute_value("deployment_user")
476 for guid, trace_list in self._add_trace.iteritems():
477 traces_info[guid] = dict()
478 for trace_id in trace_list:
479 traces_info[guid][trace_id] = dict()
480 filepath = self.trace(guid, trace_id, attribute = "path")
481 traces_info[guid][trace_id]["host"] = host
482 traces_info[guid][trace_id]["user"] = user
483 traces_info[guid][trace_id]["filepath"] = filepath
486 def trace_filepath(self, guid, trace_id):
488 Return a trace's file path, for TestbedController's default
489 implementation of trace()
491 raise NotImplementedError
493 #shutdown: NotImplementedError
495 def get_connected(self, guid, connector_type_name,
496 other_connector_type_name):
497 """searchs the connected elements for the specific connector_type_name
499 if guid not in self._connect:
501 # all connections for all connectors for guid
502 all_connections = self._connect[guid]
503 if connector_type_name not in all_connections:
505 # all connections for the specific connector
506 connections = all_connections[connector_type_name]
507 specific_connections = [otr_guid for otr_guid, otr_connector_type_name \
508 in connections.iteritems() if \
509 otr_connector_type_name == other_connector_type_name]
510 return specific_connections
512 def _get_connection_count(self, guid, connection_type_name):
515 if guid in self._connect and connection_type_name in \
517 count = len(self._connect[guid][connection_type_name])
518 if guid in self._cross_connect and connection_type_name in \
519 self._cross_connect[guid]:
520 cross_count = len(self._cross_connect[guid][connection_type_name])
521 return count + cross_count
523 def _get_traces(self, guid):
524 return [] if guid not in self._add_trace else self._add_trace[guid]
526 def _get_parameters(self, guid):
527 return dict() if guid not in self._create_set else \
528 self._create_set[guid]
530 def _get_factory(self, guid):
531 factory_id = self._create[guid]
532 return self._factories[factory_id]
534 def _get_factory_id(self, guid):
535 """ Returns the factory ID of the (perhaps not yet) created object """
536 return self._create.get(guid, None)
538 def _validate_guid(self, guid):
539 if not guid in self._create:
540 raise RuntimeError("Element guid %d doesn't exist" % guid)
542 def _validate_not_guid(self, guid):
543 if guid in self._create:
544 raise AttributeError("Cannot add elements with the same guid: %d" %
547 def _validate_factory_id(self, factory_id):
548 if factory_id not in self._factories:
549 raise AttributeError("Invalid element type %s for testbed version %s" %
550 (factory_id, self._testbed_version))
552 def _validate_testbed_attribute(self, name):
553 if not self._attributes.has_attribute(name):
554 raise AttributeError("Invalid testbed attribute %s for testbed" % \
557 def _validate_testbed_value(self, name, value):
558 if not self._attributes.is_attribute_value_valid(name, value):
559 raise AttributeError("Invalid value %r for testbed attribute %s" % \
562 def _validate_box_attribute(self, guid, name):
563 factory = self._get_factory(guid)
564 if not factory.box_attributes.has_attribute(name):
565 raise AttributeError("Invalid attribute %s for element type %s" %
566 (name, factory.factory_id))
568 def _validate_box_value(self, guid, name, value):
569 factory = self._get_factory(guid)
570 if not factory.box_attributes.is_attribute_value_valid(name, value):
571 raise AttributeError("Invalid value %r for attribute %s" % \
574 def _validate_factory_attribute(self, guid, name):
575 factory = self._get_factory(guid)
576 if not factory.has_attribute(name):
577 raise AttributeError("Invalid attribute %s for element type %s" %
578 (name, factory.factory_id))
580 def _validate_factory_value(self, guid, name, value):
581 factory = self._get_factory(guid)
582 if not factory.is_attribute_value_valid(name, value):
583 raise AttributeError("Invalid value %r for attribute %s" % \
586 def _validate_trace(self, guid, trace_name):
587 factory = self._get_factory(guid)
588 if not trace_name in factory.traces_list:
589 raise RuntimeError("Element type '%s' has no trace '%s'" %
590 (factory.factory_id, trace_name))
592 def _validate_allow_addresses(self, guid):
593 factory = self._get_factory(guid)
594 if not factory.allow_addresses:
595 raise RuntimeError("Element type '%s' doesn't support addresses" %
597 attr_name = "maxAddresses"
598 if guid in self._create_set and attr_name in self._create_set[guid]:
599 max_addresses = self._create_set[guid][attr_name]
601 factory = self._get_factory(guid)
602 max_addresses = factory.box_attributes.get_attribute_value(attr_name)
603 if guid in self._add_address:
604 count_addresses = len(self._add_address[guid])
605 if max_addresses == count_addresses:
606 raise RuntimeError("Element guid %d of type '%s' can't accept \
607 more addresses" % (guid, factory.factory_id))
609 def _validate_allow_routes(self, guid):
610 factory = self._get_factory(guid)
611 if not factory.allow_routes:
612 raise RuntimeError("Element type '%s' doesn't support routes" %
615 def _validate_connection(self, guid1, connector_type_name1, guid2,
616 connector_type_name2, cross = False):
617 # can't connect with self
619 raise AttributeError("Can't connect guid %d to self" % \
621 # the connection is already done, so ignore
622 connected = self.get_connected(guid1, connector_type_name1,
623 connector_type_name2)
624 if guid2 in connected:
626 count1 = self._get_connection_count(guid1, connector_type_name1)
627 factory1 = self._get_factory(guid1)
628 connector_type1 = factory1.connector_type(connector_type_name1)
629 if count1 == connector_type1.max:
630 raise AttributeError("Connector %s is full for guid %d" % \
631 (connector_type_name1, guid1))
633 def _validate_modify_box_value(self, guid, name):
634 factory = self._get_factory(guid)
635 if self._status > TS.STATUS_STARTED and \
636 (factory.box_attributes.is_attribute_exec_read_only(name) or \
637 factory.box_attributes.is_attribute_exec_immutable(name)):
638 raise AttributeError("Attribute %s can only be modified during experiment design" % name)