2 # -*- coding: utf-8 -*-
4 from nepi.core import execute
5 from nepi.core.metadata import Metadata, Parallel
6 from nepi.util import validation
7 from nepi.util.constants import TIME_NOW, \
8 ApplicationStatus as AS, \
11 from nepi.util.parallel import ParallelRun
17 class TestbedController(execute.TestbedController):
18 def __init__(self, testbed_id, testbed_version):
19 super(TestbedController, self).__init__(testbed_id, testbed_version)
20 self._status = TS.STATUS_ZERO
21 # testbed attributes for validation
22 self._attributes = None
23 # element factories for validation
24 self._factories = dict()
26 # experiment construction instructions
28 self._create_set = dict()
29 self._factory_set = dict()
30 self._connect = dict()
31 self._cross_connect = dict()
32 self._add_trace = dict()
33 self._add_address = dict()
34 self._add_route = dict()
35 self._configure = dict()
37 # log of set operations
42 # testbed element instances
43 self._elements = dict()
45 self._metadata = Metadata(self._testbed_id)
46 if self._metadata.testbed_version != testbed_version:
47 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
48 (testbed_id, testbed_version, self._metadata.testbed_version))
49 for factory in self._metadata.build_factories():
50 self._factories[factory.factory_id] = factory
51 self._attributes = self._metadata.testbed_attributes()
52 self._root_directory = None
55 self._logger = logging.getLogger("nepi.core.testbed_impl")
58 def root_directory(self):
59 return self._root_directory
63 return self._create.keys()
69 def defer_configure(self, name, value):
70 self._validate_testbed_attribute(name)
71 self._validate_testbed_value(name, value)
72 self._attributes.set_attribute_value(name, value)
73 self._configure[name] = value
75 def defer_create(self, guid, factory_id):
76 self._validate_factory_id(factory_id)
77 self._validate_not_guid(guid)
78 self._create[guid] = factory_id
80 def defer_create_set(self, guid, name, value):
81 self._validate_guid(guid)
82 self._validate_box_attribute(guid, name)
83 self._validate_box_value(guid, name, value)
84 if guid not in self._create_set:
85 self._create_set[guid] = dict()
86 self._create_set[guid][name] = value
88 def defer_factory_set(self, guid, name, value):
89 self._validate_guid(guid)
90 self._validate_factory_attribute(guid, name)
91 self._validate_factory_value(guid, name, value)
92 if guid not in self._factory_set:
93 self._factory_set[guid] = dict()
94 self._factory_set[guid][name] = value
96 def defer_connect(self, guid1, connector_type_name1, guid2,
97 connector_type_name2):
98 self._validate_guid(guid1)
99 self._validate_guid(guid2)
100 factory1 = self._get_factory(guid1)
101 factory_id2 = self._create[guid2]
102 connector_type = factory1.connector_type(connector_type_name1)
103 connector_type.can_connect(self._testbed_id, factory_id2,
104 connector_type_name2, False)
105 self._validate_connection(guid1, connector_type_name1, guid2,
106 connector_type_name2)
108 if not guid1 in self._connect:
109 self._connect[guid1] = dict()
110 if not connector_type_name1 in self._connect[guid1]:
111 self._connect[guid1][connector_type_name1] = dict()
112 self._connect[guid1][connector_type_name1][guid2] = \
114 if not guid2 in self._connect:
115 self._connect[guid2] = dict()
116 if not connector_type_name2 in self._connect[guid2]:
117 self._connect[guid2][connector_type_name2] = dict()
118 self._connect[guid2][connector_type_name2][guid1] = \
121 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
122 cross_testbed_guid, cross_testbed_id, cross_factory_id,
123 cross_connector_type_name):
124 self._validate_guid(guid)
125 factory = self._get_factory(guid)
126 connector_type = factory.connector_type(connector_type_name)
127 connector_type.can_connect(cross_testbed_id, cross_factory_id,
128 cross_connector_type_name, True)
129 self._validate_connection(guid, connector_type_name, cross_guid,
130 cross_connector_type_name)
132 if not guid in self._cross_connect:
133 self._cross_connect[guid] = dict()
134 if not connector_type_name in self._cross_connect[guid]:
135 self._cross_connect[guid][connector_type_name] = dict()
136 self._cross_connect[guid][connector_type_name] = \
137 (cross_guid, cross_testbed_guid, cross_testbed_id,
138 cross_factory_id, cross_connector_type_name)
140 def defer_add_trace(self, guid, trace_name):
141 self._validate_guid(guid)
142 self._validate_trace(guid, trace_name)
143 if not guid in self._add_trace:
144 self._add_trace[guid] = list()
145 self._add_trace[guid].append(trace_name)
147 def defer_add_address(self, guid, address, netprefix, broadcast):
148 self._validate_guid(guid)
149 self._validate_allow_addresses(guid)
150 if guid not in self._add_address:
151 self._add_address[guid] = list()
152 self._add_address[guid].append((address, netprefix, broadcast))
154 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
155 self._validate_guid(guid)
156 self._validate_allow_routes(guid)
157 if not guid in self._add_route:
158 self._add_route[guid] = list()
159 self._add_route[guid].append((destination, netprefix, nexthop, metric))
162 self._root_directory = self._attributes.\
163 get_attribute_value("rootDirectory")
164 self._status = TS.STATUS_SETUP
167 def set_params(self, guid):
168 parameters = self._get_parameters(guid)
169 for name, value in parameters.iteritems():
170 self.set(guid, name, value)
172 self._do_in_factory_order(
174 self._metadata.create_order,
175 postaction = set_params )
176 self._status = TS.STATUS_CREATED
178 def _do_connect(self, init = True):
179 unconnected = copy.deepcopy(self._connect)
182 for guid1, connections in unconnected.items():
183 factory1 = self._get_factory(guid1)
184 for connector_type_name1, connections2 in connections.items():
185 connector_type1 = factory1.connector_type(connector_type_name1)
186 for guid2, connector_type_name2 in connections2.items():
187 factory_id2 = self._create[guid2]
188 # Connections are executed in a "From -> To" direction only
189 # This explicitly ignores the "To -> From" (mirror)
190 # connections of every connection pair.
192 connect_code = connector_type1.connect_to_init_code(
193 self._testbed_id, factory_id2,
194 connector_type_name2,
197 connect_code = connector_type1.connect_to_compl_code(
198 self._testbed_id, factory_id2,
199 connector_type_name2,
203 delay = connect_code(self, guid1, guid2)
205 if delay is not CONNECTION_DELAY:
206 del unconnected[guid1][connector_type_name1][guid2]
207 if not unconnected[guid1][connector_type_name1]:
208 del unconnected[guid1][connector_type_name1]
209 if not unconnected[guid1]:
210 del unconnected[guid1]
212 def do_connect_init(self):
215 def do_connect_compl(self):
216 self._do_connect(init = False)
217 self._status = TS.STATUS_CONNECTED
219 def _do_in_factory_order(self, action, order, postaction = None, poststep = None):
220 logger = self._logger
222 guids = collections.defaultdict(list)
223 # order guids (elements) according to factory_id
224 for guid, factory_id in self._create.iteritems():
225 guids[factory_id].append(guid)
227 # configure elements following the factory_id order
228 for factory_id in order:
229 # Create a parallel runner if we're given a Parallel() wrapper
231 if isinstance(factory_id, Parallel):
232 runner = ParallelRun(factory_id.maxthreads)
233 factory_id = factory_id.factory
235 # omit the factories that have no element to create
236 if factory_id not in guids:
240 factory = self._factories[factory_id]
241 if isinstance(action, basestring) and not getattr(factory, action):
243 def perform_action(guid):
244 if isinstance(action, basestring):
245 getattr(factory, action)(self, guid)
249 postaction(self, guid)
251 # perform the action on all elements, in parallel if so requested
253 logger.debug("Starting parallel %s", action)
256 for guid in guids[factory_id]:
258 logger.debug("Scheduling %s on %s", action, guid)
259 runner.put(perform_action, guid)
261 logger.debug("Performing %s on %s", action, guid)
270 for guid in guids[factory_id]:
272 logger.debug("Scheduling post-%s on %s", action, guid)
273 runner.put(poststep, self, guid)
275 logger.debug("Performing post-%s on %s", action, guid)
281 logger.debug("Finished parallel %s", action)
284 def do_poststep_preconfigure(self, guid):
285 # dummy hook for implementations interested in
286 # two-phase configuration
289 def do_preconfigure(self):
290 self._do_in_factory_order(
291 'preconfigure_function',
292 self._metadata.preconfigure_order,
293 poststep = self.do_poststep_preconfigure )
296 def do_poststep_configure(self, guid):
297 # dummy hook for implementations interested in
298 # two-phase configuration
301 def do_configure(self):
302 self._do_in_factory_order(
303 'configure_function',
304 self._metadata.configure_order,
305 poststep = self.do_poststep_configure )
306 self._status = TS.STATUS_CONFIGURED
308 def do_prestart(self):
309 self._do_in_factory_order(
311 self._metadata.prestart_order )
313 def _do_cross_connect(self, cross_data, init = True):
314 for guid, cross_connections in self._cross_connect.iteritems():
315 factory = self._get_factory(guid)
316 for connector_type_name, cross_connection in \
317 cross_connections.iteritems():
318 connector_type = factory.connector_type(connector_type_name)
319 (cross_guid, cross_testbed_guid, cross_testbed_id,
320 cross_factory_id, cross_connector_type_name) = cross_connection
322 connect_code = connector_type.connect_to_init_code(
323 cross_testbed_id, cross_factory_id,
324 cross_connector_type_name,
327 connect_code = connector_type.connect_to_compl_code(
328 cross_testbed_id, cross_factory_id,
329 cross_connector_type_name,
332 elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
333 connect_code(self, guid, elem_cross_data)
335 def do_cross_connect_init(self, cross_data):
336 self._do_cross_connect(cross_data)
338 def do_cross_connect_compl(self, cross_data):
339 self._do_cross_connect(cross_data, init = False)
340 self._status = TS.STATUS_CROSS_CONNECTED
342 def set(self, guid, name, value, time = TIME_NOW):
343 self._validate_guid(guid)
344 self._validate_box_attribute(guid, name)
345 self._validate_box_value(guid, name, value)
346 self._validate_modify_box_value(guid, name)
347 if guid not in self._set:
348 self._set[guid] = dict()
349 self._setlog[guid] = dict()
350 if time not in self._setlog[guid]:
351 self._setlog[guid][time] = dict()
352 self._setlog[guid][time][name] = value
353 self._set[guid][name] = value
355 def get(self, guid, name, time = TIME_NOW):
357 gets an attribute from box definitions if available.
358 Throws KeyError if the GUID wasn't created
359 through the defer_create interface, and AttributeError if the
360 attribute isn't available (doesn't exist or is design-only)
362 self._validate_guid(guid)
363 self._validate_box_attribute(guid, name)
364 if guid in self._set and name in self._set[guid]:
365 return self._set[guid][name]
366 if guid in self._create_set and name in self._create_set[guid]:
367 return self._create_set[guid][name]
368 # if nothing else found, returns the factory default value
369 factory = self._get_factory(guid)
370 return factory.box_attributes.get_attribute_value(name)
372 def get_route(self, guid, index, attribute):
374 returns information given to defer_add_route.
376 Raises AttributeError if an invalid attribute is requested
377 or if the indexed routing rule does not exist.
379 Raises KeyError if the GUID has not been seen by
382 ATTRIBUTES = ['Destination', 'NetPrefix', 'NextHop']
384 if attribute not in ATTRIBUTES:
385 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
387 attribute_index = ATTRIBUTES.index(attribute)
389 routes = self._add_route.get(guid)
391 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
394 if not (0 <= index < len(addresses)):
395 raise AttributeError, "GUID %r at %s does not have a routing entry #%s" % (
396 guid, self._testbed_id, index)
398 return routes[index][attribute_index]
400 def get_address(self, guid, index, attribute='Address'):
402 returns information given to defer_add_address
404 Raises AttributeError if an invalid attribute is requested
405 or if the indexed routing rule does not exist.
407 Raises KeyError if the GUID has not been seen by
410 ATTRIBUTES = ['Address', 'NetPrefix', 'Broadcast']
412 if attribute not in ATTRIBUTES:
413 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
415 attribute_index = ATTRIBUTES.index(attribute)
417 addresses = self._add_address.get(guid)
419 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
422 if not (0 <= index < len(addresses)):
423 raise AttributeError, "GUID %r at %s does not have an address #%s" % (
424 guid, self._testbed_id, index)
426 return addresses[index][attribute_index]
428 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
429 factory = self._get_factory(guid)
430 attribute_list = list()
431 return factory.box_attributes.get_attribute_list(filter_flags, exclude)
433 def get_factory_id(self, guid):
434 factory = self._get_factory(guid)
435 return factory.factory_id
437 def start(self, time = TIME_NOW):
438 self._do_in_factory_order(
440 self._metadata.start_order )
441 self._status = TS.STATUS_STARTED
443 #action: NotImplementedError
445 def stop(self, time = TIME_NOW):
446 self._do_in_factory_order(
448 reversed(self._metadata.start_order) )
449 self._status = TS.STATUS_STOPPED
451 def status(self, guid = None):
454 self._validate_guid(guid)
455 factory = self._get_factory(guid)
456 status_function = factory.status_function
458 return status_function(self, guid)
459 return AS.STATUS_UNDETERMINED
461 def testbed_status(self):
464 def trace(self, guid, trace_id, attribute='value'):
465 if attribute == 'value':
466 fd = open("%s" % self.trace_filepath(guid, trace_id), "r")
469 elif attribute == 'path':
470 content = self.trace_filepath(guid, trace_id)
471 elif attribute == 'filename':
472 content = self.trace_filename(guid, trace_id)
477 def traces_info(self):
479 host = self._attributes.get_attribute_value("deployment_host")
480 user = self._attributes.get_attribute_value("deployment_user")
481 for guid, trace_list in self._add_trace.iteritems():
482 traces_info[guid] = dict()
483 for trace_id in trace_list:
484 traces_info[guid][trace_id] = dict()
485 filepath = self.trace(guid, trace_id, attribute = "path")
486 traces_info[guid][trace_id]["host"] = host
487 traces_info[guid][trace_id]["user"] = user
488 traces_info[guid][trace_id]["filepath"] = filepath
491 def trace_filepath(self, guid, trace_id):
493 Return a trace's file path, for TestbedController's default
494 implementation of trace()
496 raise NotImplementedError
498 def trace_filename(self, guid, trace_id):
500 Return a trace's file name, for TestbedController's default
501 implementation of trace()
503 raise NotImplementedError
505 #shutdown: NotImplementedError
507 def get_connected(self, guid, connector_type_name,
508 other_connector_type_name):
509 """searchs the connected elements for the specific connector_type_name
511 if guid not in self._connect:
513 # all connections for all connectors for guid
514 all_connections = self._connect[guid]
515 if connector_type_name not in all_connections:
517 # all connections for the specific connector
518 connections = all_connections[connector_type_name]
519 specific_connections = [otr_guid for otr_guid, otr_connector_type_name \
520 in connections.iteritems() if \
521 otr_connector_type_name == other_connector_type_name]
522 return specific_connections
524 def _get_connection_count(self, guid, connection_type_name):
527 if guid in self._connect and connection_type_name in \
529 count = len(self._connect[guid][connection_type_name])
530 if guid in self._cross_connect and connection_type_name in \
531 self._cross_connect[guid]:
532 cross_count = len(self._cross_connect[guid][connection_type_name])
533 return count + cross_count
535 def _get_traces(self, guid):
536 return [] if guid not in self._add_trace else self._add_trace[guid]
538 def _get_parameters(self, guid):
539 return dict() if guid not in self._create_set else \
540 self._create_set[guid]
542 def _get_factory(self, guid):
543 factory_id = self._create[guid]
544 return self._factories[factory_id]
546 def _get_factory_id(self, guid):
547 """ Returns the factory ID of the (perhaps not yet) created object """
548 return self._create.get(guid, None)
550 def _validate_guid(self, guid):
551 if not guid in self._create:
552 raise RuntimeError("Element guid %d doesn't exist" % guid)
554 def _validate_not_guid(self, guid):
555 if guid in self._create:
556 raise AttributeError("Cannot add elements with the same guid: %d" %
559 def _validate_factory_id(self, factory_id):
560 if factory_id not in self._factories:
561 raise AttributeError("Invalid element type %s for testbed version %s" %
562 (factory_id, self._testbed_version))
564 def _validate_testbed_attribute(self, name):
565 if not self._attributes.has_attribute(name):
566 raise AttributeError("Invalid testbed attribute %s for testbed" % \
569 def _validate_testbed_value(self, name, value):
570 if not self._attributes.is_attribute_value_valid(name, value):
571 raise AttributeError("Invalid value %r for testbed attribute %s" % \
574 def _validate_box_attribute(self, guid, name):
575 factory = self._get_factory(guid)
576 if not factory.box_attributes.has_attribute(name):
577 raise AttributeError("Invalid attribute %s for element type %s" %
578 (name, factory.factory_id))
580 def _validate_box_value(self, guid, name, value):
581 factory = self._get_factory(guid)
582 if not factory.box_attributes.is_attribute_value_valid(name, value):
583 raise AttributeError("Invalid value %r for attribute %s" % \
586 def _validate_factory_attribute(self, guid, name):
587 factory = self._get_factory(guid)
588 if not factory.has_attribute(name):
589 raise AttributeError("Invalid attribute %s for element type %s" %
590 (name, factory.factory_id))
592 def _validate_factory_value(self, guid, name, value):
593 factory = self._get_factory(guid)
594 if not factory.is_attribute_value_valid(name, value):
595 raise AttributeError("Invalid value %r for attribute %s" % \
598 def _validate_trace(self, guid, trace_name):
599 factory = self._get_factory(guid)
600 if not trace_name in factory.traces_list:
601 raise RuntimeError("Element type '%s' has no trace '%s'" %
602 (factory.factory_id, trace_name))
604 def _validate_allow_addresses(self, guid):
605 factory = self._get_factory(guid)
606 if not factory.allow_addresses:
607 raise RuntimeError("Element type '%s' doesn't support addresses" %
609 attr_name = "maxAddresses"
610 if guid in self._create_set and attr_name in self._create_set[guid]:
611 max_addresses = self._create_set[guid][attr_name]
613 factory = self._get_factory(guid)
614 max_addresses = factory.box_attributes.get_attribute_value(attr_name)
615 if guid in self._add_address:
616 count_addresses = len(self._add_address[guid])
617 if max_addresses == count_addresses:
618 raise RuntimeError("Element guid %d of type '%s' can't accept \
619 more addresses" % (guid, factory.factory_id))
621 def _validate_allow_routes(self, guid):
622 factory = self._get_factory(guid)
623 if not factory.allow_routes:
624 raise RuntimeError("Element type '%s' doesn't support routes" %
627 def _validate_connection(self, guid1, connector_type_name1, guid2,
628 connector_type_name2, cross = False):
629 # can't connect with self
631 raise AttributeError("Can't connect guid %d to self" % \
633 # the connection is already done, so ignore
634 connected = self.get_connected(guid1, connector_type_name1,
635 connector_type_name2)
636 if guid2 in connected:
638 count1 = self._get_connection_count(guid1, connector_type_name1)
639 factory1 = self._get_factory(guid1)
640 connector_type1 = factory1.connector_type(connector_type_name1)
641 if count1 == connector_type1.max:
642 raise AttributeError("Connector %s is full for guid %d" % \
643 (connector_type_name1, guid1))
645 def _validate_modify_box_value(self, guid, name):
646 factory = self._get_factory(guid)
647 if self._status > TS.STATUS_STARTED and \
648 (factory.box_attributes.is_attribute_exec_read_only(name) or \
649 factory.box_attributes.is_attribute_exec_immutable(name)):
650 raise AttributeError("Attribute %s can only be modified during experiment design" % name)