2 # -*- coding: utf-8 -*-
4 from nepi.core import execute
5 from nepi.core.metadata import Metadata, Parallel
6 from nepi.util import validation
7 from nepi.util.constants import TIME_NOW, \
8 ApplicationStatus as AS, \
11 from nepi.util.parallel import ParallelRun
17 class TestbedController(execute.TestbedController):
18 def __init__(self, testbed_id, testbed_version):
19 super(TestbedController, self).__init__(testbed_id, testbed_version)
20 self._status = TS.STATUS_ZERO
21 # testbed attributes for validation
22 self._attributes = None
23 # element factories for validation
24 self._factories = dict()
26 # experiment construction instructions
28 self._create_set = dict()
29 self._factory_set = dict()
30 self._connect = dict()
31 self._cross_connect = dict()
32 self._add_trace = dict()
33 self._add_address = dict()
34 self._add_route = dict()
35 self._configure = dict()
37 # log of set operations
42 # testbed element instances
43 self._elements = dict()
45 self._metadata = Metadata(self._testbed_id)
46 if self._metadata.testbed_version != testbed_version:
47 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
48 (testbed_id, testbed_version, self._metadata.testbed_version))
49 for factory in self._metadata.build_factories():
50 self._factories[factory.factory_id] = factory
51 self._attributes = self._metadata.testbed_attributes()
52 self._root_directory = None
55 self._logger = logging.getLogger("nepi.core.testbed_impl")
58 def root_directory(self):
59 return self._root_directory
63 return self._create.keys()
69 def defer_configure(self, name, value):
70 self._validate_testbed_attribute(name)
71 self._validate_testbed_value(name, value)
72 self._attributes.set_attribute_value(name, value)
73 self._configure[name] = value
75 def defer_create(self, guid, factory_id):
76 self._validate_factory_id(factory_id)
77 self._validate_not_guid(guid)
78 self._create[guid] = factory_id
80 def defer_create_set(self, guid, name, value):
81 self._validate_guid(guid)
82 self._validate_box_attribute(guid, name)
83 self._validate_box_value(guid, name, value)
84 if guid not in self._create_set:
85 self._create_set[guid] = dict()
86 self._create_set[guid][name] = value
88 def defer_factory_set(self, guid, name, value):
89 self._validate_guid(guid)
90 self._validate_factory_attribute(guid, name)
91 self._validate_factory_value(guid, name, value)
92 if guid not in self._factory_set:
93 self._factory_set[guid] = dict()
94 self._factory_set[guid][name] = value
96 def defer_connect(self, guid1, connector_type_name1, guid2,
97 connector_type_name2):
98 self._validate_guid(guid1)
99 self._validate_guid(guid2)
100 factory1 = self._get_factory(guid1)
101 factory_id2 = self._create[guid2]
102 connector_type = factory1.connector_type(connector_type_name1)
103 connector_type.can_connect(self._testbed_id, factory_id2,
104 connector_type_name2, False)
105 self._validate_connection(guid1, connector_type_name1, guid2,
106 connector_type_name2)
108 if not guid1 in self._connect:
109 self._connect[guid1] = dict()
110 if not connector_type_name1 in self._connect[guid1]:
111 self._connect[guid1][connector_type_name1] = dict()
112 self._connect[guid1][connector_type_name1][guid2] = \
114 if not guid2 in self._connect:
115 self._connect[guid2] = dict()
116 if not connector_type_name2 in self._connect[guid2]:
117 self._connect[guid2][connector_type_name2] = dict()
118 self._connect[guid2][connector_type_name2][guid1] = \
121 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
122 cross_testbed_guid, cross_testbed_id, cross_factory_id,
123 cross_connector_type_name):
124 self._validate_guid(guid)
125 factory = self._get_factory(guid)
126 connector_type = factory.connector_type(connector_type_name)
127 connector_type.can_connect(cross_testbed_id, cross_factory_id,
128 cross_connector_type_name, True)
129 self._validate_connection(guid, connector_type_name, cross_guid,
130 cross_connector_type_name)
132 if not guid in self._cross_connect:
133 self._cross_connect[guid] = dict()
134 if not connector_type_name in self._cross_connect[guid]:
135 self._cross_connect[guid][connector_type_name] = dict()
136 self._cross_connect[guid][connector_type_name] = \
137 (cross_guid, cross_testbed_guid, cross_testbed_id,
138 cross_factory_id, cross_connector_type_name)
140 def defer_add_trace(self, guid, trace_name):
141 self._validate_guid(guid)
142 self._validate_trace(guid, trace_name)
143 if not guid in self._add_trace:
144 self._add_trace[guid] = list()
145 self._add_trace[guid].append(trace_name)
147 def defer_add_address(self, guid, address, netprefix, broadcast):
148 self._validate_guid(guid)
149 self._validate_allow_addresses(guid)
150 if guid not in self._add_address:
151 self._add_address[guid] = list()
152 self._add_address[guid].append((address, netprefix, broadcast))
154 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
155 self._validate_guid(guid)
156 self._validate_allow_routes(guid)
157 if not guid in self._add_route:
158 self._add_route[guid] = list()
159 self._add_route[guid].append((destination, netprefix, nexthop, metric))
162 self._root_directory = self._attributes.\
163 get_attribute_value("rootDirectory")
164 self._status = TS.STATUS_SETUP
167 def set_params(self, guid):
168 parameters = self._get_parameters(guid)
169 for name, value in parameters.iteritems():
170 self.set(guid, name, value)
172 self._do_in_factory_order(
174 self._metadata.create_order,
175 postaction = set_params )
176 self._status = TS.STATUS_CREATED
178 def _do_connect(self, init = True):
179 unconnected = copy.deepcopy(self._connect)
182 for guid1, connections in unconnected.items():
183 factory1 = self._get_factory(guid1)
184 for connector_type_name1, connections2 in connections.items():
185 connector_type1 = factory1.connector_type(connector_type_name1)
186 for guid2, connector_type_name2 in connections2.items():
187 factory_id2 = self._create[guid2]
188 # Connections are executed in a "From -> To" direction only
189 # This explicitly ignores the "To -> From" (mirror)
190 # connections of every connection pair.
192 connect_code = connector_type1.connect_to_init_code(
193 self._testbed_id, factory_id2,
194 connector_type_name2,
197 connect_code = connector_type1.connect_to_compl_code(
198 self._testbed_id, factory_id2,
199 connector_type_name2,
203 delay = connect_code(self, guid1, guid2)
205 if delay is not CONNECTION_DELAY:
206 del unconnected[guid1][connector_type_name1][guid2]
207 if not unconnected[guid1][connector_type_name1]:
208 del unconnected[guid1][connector_type_name1]
209 if not unconnected[guid1]:
210 del unconnected[guid1]
212 def do_connect_init(self):
215 def do_connect_compl(self):
216 self._do_connect(init = False)
217 self._status = TS.STATUS_CONNECTED
219 def _do_in_factory_order(self, action, order, postaction = None, poststep = None):
220 logger = self._logger
222 guids = collections.defaultdict(list)
223 # order guids (elements) according to factory_id
224 for guid, factory_id in self._create.iteritems():
225 guids[factory_id].append(guid)
227 # configure elements following the factory_id order
228 for factory_id in order:
229 # Create a parallel runner if we're given a Parallel() wrapper
231 if isinstance(factory_id, Parallel):
232 runner = ParallelRun(factory_id.maxthreads)
233 factory_id = factory_id.factory
235 # omit the factories that have no element to create
236 if factory_id not in guids:
240 factory = self._factories[factory_id]
241 if isinstance(action, basestring) and not getattr(factory, action):
243 def perform_action(guid):
244 if isinstance(action, basestring):
245 getattr(factory, action)(self, guid)
249 postaction(self, guid)
251 # perform the action on all elements, in parallel if so requested
253 logger.debug("Starting parallel %s", action)
256 for guid in guids[factory_id]:
258 logger.debug("Scheduling %s on %s", action, guid)
259 runner.put(perform_action, guid)
261 logger.debug("Performing %s on %s", action, guid)
270 for guid in guids[factory_id]:
272 logger.debug("Scheduling post-%s on %s", action, guid)
273 runner.put(poststep, self, guid)
275 logger.debug("Performing post-%s on %s", action, guid)
281 logger.debug("Finished parallel %s", action)
284 def do_poststep_preconfigure(self, guid):
285 # dummy hook for implementations interested in
286 # two-phase configuration
289 def do_preconfigure(self):
290 self._do_in_factory_order(
291 'preconfigure_function',
292 self._metadata.preconfigure_order,
293 poststep = self.do_poststep_preconfigure )
296 def do_poststep_configure(self, guid):
297 # dummy hook for implementations interested in
298 # two-phase configuration
301 def do_configure(self):
302 self._do_in_factory_order(
303 'configure_function',
304 self._metadata.configure_order,
305 poststep = self.do_poststep_configure )
306 self._status = TS.STATUS_CONFIGURED
308 def do_prestart(self):
309 self._do_in_factory_order(
311 self._metadata.prestart_order )
313 def _do_cross_connect(self, cross_data, init = True):
314 for guid, cross_connections in self._cross_connect.iteritems():
315 factory = self._get_factory(guid)
316 for connector_type_name, cross_connection in \
317 cross_connections.iteritems():
318 connector_type = factory.connector_type(connector_type_name)
319 (cross_guid, cross_testbed_guid, cross_testbed_id,
320 cross_factory_id, cross_connector_type_name) = cross_connection
322 connect_code = connector_type.connect_to_init_code(
323 cross_testbed_id, cross_factory_id,
324 cross_connector_type_name,
327 connect_code = connector_type.connect_to_compl_code(
328 cross_testbed_id, cross_factory_id,
329 cross_connector_type_name,
332 elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
333 connect_code(self, guid, elem_cross_data)
335 def do_cross_connect_init(self, cross_data):
336 self._do_cross_connect(cross_data)
338 def do_cross_connect_compl(self, cross_data):
339 self._do_cross_connect(cross_data, init = False)
340 self._status = TS.STATUS_CROSS_CONNECTED
342 def set(self, guid, name, value, time = TIME_NOW):
343 self._validate_guid(guid)
344 self._validate_box_attribute(guid, name)
345 self._validate_box_value(guid, name, value)
346 self._validate_modify_box_value(guid, name)
347 if guid not in self._set:
348 self._set[guid] = dict()
349 self._setlog[guid] = dict()
350 if time not in self._setlog[guid]:
351 self._setlog[guid][time] = dict()
352 self._setlog[guid][time][name] = value
353 self._set[guid][name] = value
355 def get(self, guid, name, time = TIME_NOW):
357 gets an attribute from box definitions if available.
358 Throws KeyError if the GUID wasn't created
359 through the defer_create interface, and AttributeError if the
360 attribute isn't available (doesn't exist or is design-only)
362 self._validate_guid(guid)
363 self._validate_box_attribute(guid, name)
364 if guid in self._set and name in self._set[guid]:
365 return self._set[guid][name]
366 if guid in self._create_set and name in self._create_set[guid]:
367 return self._create_set[guid][name]
368 # if nothing else found, returns the factory default value
369 factory = self._get_factory(guid)
370 return factory.box_attributes.get_attribute_value(name)
372 def get_route(self, guid, index, attribute):
374 returns information given to defer_add_route.
376 Raises AttributeError if an invalid attribute is requested
377 or if the indexed routing rule does not exist.
379 Raises KeyError if the GUID has not been seen by
382 ATTRIBUTES = ['Destination', 'NetPrefix', 'NextHop']
384 if attribute not in ATTRIBUTES:
385 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
387 attribute_index = ATTRIBUTES.index(attribute)
389 routes = self._add_route.get(guid)
391 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
394 if not (0 <= index < len(addresses)):
395 raise AttributeError, "GUID %r at %s does not have a routing entry #%s" % (
396 guid, self._testbed_id, index)
398 return routes[index][attribute_index]
400 def get_address(self, guid, index, attribute='Address'):
402 returns information given to defer_add_address
404 Raises AttributeError if an invalid attribute is requested
405 or if the indexed routing rule does not exist.
407 Raises KeyError if the GUID has not been seen by
410 ATTRIBUTES = ['Address', 'NetPrefix', 'Broadcast']
412 if attribute not in ATTRIBUTES:
413 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
415 attribute_index = ATTRIBUTES.index(attribute)
417 addresses = self._add_address.get(guid)
419 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
422 if not (0 <= index < len(addresses)):
423 raise AttributeError, "GUID %r at %s does not have an address #%s" % (
424 guid, self._testbed_id, index)
426 return addresses[index][attribute_index]
428 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
429 factory = self._get_factory(guid)
430 attribute_list = list()
431 return factory.box_attributes.get_attribute_list(filter_flags, exclude)
433 def get_factory_id(self, guid):
434 factory = self._get_factory(guid)
435 return factory.factory_id
437 def start(self, time = TIME_NOW):
438 self._do_in_factory_order(
440 self._metadata.start_order )
441 self._status = TS.STATUS_STARTED
443 #action: NotImplementedError
445 def stop(self, time = TIME_NOW):
446 self._do_in_factory_order(
448 reversed(self._metadata.start_order) )
449 self._status = TS.STATUS_STOPPED
451 def status(self, guid = None):
454 self._validate_guid(guid)
455 factory = self._get_factory(guid)
456 status_function = factory.status_function
458 return status_function(self, guid)
459 return AS.STATUS_UNDETERMINED
461 def testbed_status(self):
464 def trace(self, guid, trace_id, attribute='value'):
465 if attribute == 'value':
466 fd = open("%s" % self.trace_filepath(guid, trace_id), "r")
469 elif attribute == 'path':
470 content = self.trace_filepath(guid, trace_id)
475 def traces_info(self):
477 host = self._attributes.get_attribute_value("deployment_host")
478 user = self._attributes.get_attribute_value("deployment_user")
479 for guid, trace_list in self._add_trace.iteritems():
480 traces_info[guid] = dict()
481 for trace_id in trace_list:
482 traces_info[guid][trace_id] = dict()
483 filepath = self.trace(guid, trace_id, attribute = "path")
484 traces_info[guid][trace_id]["host"] = host
485 traces_info[guid][trace_id]["user"] = user
486 traces_info[guid][trace_id]["filepath"] = filepath
489 def trace_filepath(self, guid, trace_id):
491 Return a trace's file path, for TestbedController's default
492 implementation of trace()
494 raise NotImplementedError
496 #shutdown: NotImplementedError
498 def get_connected(self, guid, connector_type_name,
499 other_connector_type_name):
500 """searchs the connected elements for the specific connector_type_name
502 if guid not in self._connect:
504 # all connections for all connectors for guid
505 all_connections = self._connect[guid]
506 if connector_type_name not in all_connections:
508 # all connections for the specific connector
509 connections = all_connections[connector_type_name]
510 specific_connections = [otr_guid for otr_guid, otr_connector_type_name \
511 in connections.iteritems() if \
512 otr_connector_type_name == other_connector_type_name]
513 return specific_connections
515 def _get_connection_count(self, guid, connection_type_name):
518 if guid in self._connect and connection_type_name in \
520 count = len(self._connect[guid][connection_type_name])
521 if guid in self._cross_connect and connection_type_name in \
522 self._cross_connect[guid]:
523 cross_count = len(self._cross_connect[guid][connection_type_name])
524 return count + cross_count
526 def _get_traces(self, guid):
527 return [] if guid not in self._add_trace else self._add_trace[guid]
529 def _get_parameters(self, guid):
530 return dict() if guid not in self._create_set else \
531 self._create_set[guid]
533 def _get_factory(self, guid):
534 factory_id = self._create[guid]
535 return self._factories[factory_id]
537 def _get_factory_id(self, guid):
538 """ Returns the factory ID of the (perhaps not yet) created object """
539 return self._create.get(guid, None)
541 def _validate_guid(self, guid):
542 if not guid in self._create:
543 raise RuntimeError("Element guid %d doesn't exist" % guid)
545 def _validate_not_guid(self, guid):
546 if guid in self._create:
547 raise AttributeError("Cannot add elements with the same guid: %d" %
550 def _validate_factory_id(self, factory_id):
551 if factory_id not in self._factories:
552 raise AttributeError("Invalid element type %s for testbed version %s" %
553 (factory_id, self._testbed_version))
555 def _validate_testbed_attribute(self, name):
556 if not self._attributes.has_attribute(name):
557 raise AttributeError("Invalid testbed attribute %s for testbed" % \
560 def _validate_testbed_value(self, name, value):
561 if not self._attributes.is_attribute_value_valid(name, value):
562 raise AttributeError("Invalid value %r for testbed attribute %s" % \
565 def _validate_box_attribute(self, guid, name):
566 factory = self._get_factory(guid)
567 if not factory.box_attributes.has_attribute(name):
568 raise AttributeError("Invalid attribute %s for element type %s" %
569 (name, factory.factory_id))
571 def _validate_box_value(self, guid, name, value):
572 factory = self._get_factory(guid)
573 if not factory.box_attributes.is_attribute_value_valid(name, value):
574 raise AttributeError("Invalid value %r for attribute %s" % \
577 def _validate_factory_attribute(self, guid, name):
578 factory = self._get_factory(guid)
579 if not factory.has_attribute(name):
580 raise AttributeError("Invalid attribute %s for element type %s" %
581 (name, factory.factory_id))
583 def _validate_factory_value(self, guid, name, value):
584 factory = self._get_factory(guid)
585 if not factory.is_attribute_value_valid(name, value):
586 raise AttributeError("Invalid value %r for attribute %s" % \
589 def _validate_trace(self, guid, trace_name):
590 factory = self._get_factory(guid)
591 if not trace_name in factory.traces_list:
592 raise RuntimeError("Element type '%s' has no trace '%s'" %
593 (factory.factory_id, trace_name))
595 def _validate_allow_addresses(self, guid):
596 factory = self._get_factory(guid)
597 if not factory.allow_addresses:
598 raise RuntimeError("Element type '%s' doesn't support addresses" %
600 attr_name = "maxAddresses"
601 if guid in self._create_set and attr_name in self._create_set[guid]:
602 max_addresses = self._create_set[guid][attr_name]
604 factory = self._get_factory(guid)
605 max_addresses = factory.box_attributes.get_attribute_value(attr_name)
606 if guid in self._add_address:
607 count_addresses = len(self._add_address[guid])
608 if max_addresses == count_addresses:
609 raise RuntimeError("Element guid %d of type '%s' can't accept \
610 more addresses" % (guid, factory.factory_id))
612 def _validate_allow_routes(self, guid):
613 factory = self._get_factory(guid)
614 if not factory.allow_routes:
615 raise RuntimeError("Element type '%s' doesn't support routes" %
618 def _validate_connection(self, guid1, connector_type_name1, guid2,
619 connector_type_name2, cross = False):
620 # can't connect with self
622 raise AttributeError("Can't connect guid %d to self" % \
624 # the connection is already done, so ignore
625 connected = self.get_connected(guid1, connector_type_name1,
626 connector_type_name2)
627 if guid2 in connected:
629 count1 = self._get_connection_count(guid1, connector_type_name1)
630 factory1 = self._get_factory(guid1)
631 connector_type1 = factory1.connector_type(connector_type_name1)
632 if count1 == connector_type1.max:
633 raise AttributeError("Connector %s is full for guid %d" % \
634 (connector_type_name1, guid1))
636 def _validate_modify_box_value(self, guid, name):
637 factory = self._get_factory(guid)
638 if self._status > TS.STATUS_STARTED and \
639 (factory.box_attributes.is_attribute_exec_read_only(name) or \
640 factory.box_attributes.is_attribute_exec_immutable(name)):
641 raise AttributeError("Attribute %s can only be modified during experiment design" % name)