2 # -*- coding: utf-8 -*-
4 from nepi.core import execute
5 from nepi.core.metadata import Metadata, Parallel
6 from nepi.util import validation
7 from nepi.util.constants import TIME_NOW, \
8 ApplicationStatus as AS, \
11 from nepi.util.parallel import ParallelRun
17 class TestbedController(execute.TestbedController):
18 def __init__(self, testbed_id, testbed_version):
19 super(TestbedController, self).__init__(testbed_id, testbed_version)
20 self._status = TS.STATUS_ZERO
21 # testbed attributes for validation
22 self._attributes = None
23 # element factories for validation
24 self._factories = dict()
26 # experiment construction instructions
28 self._create_set = dict()
29 self._factory_set = dict()
30 self._connect = dict()
31 self._cross_connect = dict()
32 self._add_trace = dict()
33 self._add_address = dict()
34 self._add_route = dict()
35 self._configure = dict()
37 # log of set operations
42 # testbed element instances
43 self._elements = dict()
45 self._metadata = Metadata(self._testbed_id)
46 if self._metadata.testbed_version != testbed_version:
47 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
48 (testbed_id, testbed_version, self._metadata.testbed_version))
49 for factory in self._metadata.build_factories():
50 self._factories[factory.factory_id] = factory
51 self._attributes = self._metadata.testbed_attributes()
52 self._root_directory = None
55 self._logger = logging.getLogger("nepi.core.testbed_impl")
58 def root_directory(self):
59 return self._root_directory
63 return self._create.keys()
69 def defer_configure(self, name, value):
70 self._validate_testbed_attribute(name)
71 self._validate_testbed_value(name, value)
72 self._attributes.set_attribute_value(name, value)
73 self._configure[name] = value
75 def defer_create(self, guid, factory_id):
76 self._validate_factory_id(factory_id)
77 self._validate_not_guid(guid)
78 self._create[guid] = factory_id
80 def defer_create_set(self, guid, name, value):
81 self._validate_guid(guid)
82 self._validate_box_attribute(guid, name)
83 self._validate_box_value(guid, name, value)
84 if guid not in self._create_set:
85 self._create_set[guid] = dict()
86 self._create_set[guid][name] = value
88 def defer_factory_set(self, guid, name, value):
89 self._validate_guid(guid)
90 self._validate_factory_attribute(guid, name)
91 self._validate_factory_value(guid, name, value)
92 if guid not in self._factory_set:
93 self._factory_set[guid] = dict()
94 self._factory_set[guid][name] = value
96 def defer_connect(self, guid1, connector_type_name1, guid2,
97 connector_type_name2):
98 self._validate_guid(guid1)
99 self._validate_guid(guid2)
100 factory1 = self._get_factory(guid1)
101 factory_id2 = self._create[guid2]
102 connector_type = factory1.connector_type(connector_type_name1)
103 connector_type.can_connect(self._testbed_id, factory_id2,
104 connector_type_name2, False)
105 self._validate_connection(guid1, connector_type_name1, guid2,
106 connector_type_name2)
108 if not guid1 in self._connect:
109 self._connect[guid1] = dict()
110 if not connector_type_name1 in self._connect[guid1]:
111 self._connect[guid1][connector_type_name1] = dict()
112 self._connect[guid1][connector_type_name1][guid2] = \
114 if not guid2 in self._connect:
115 self._connect[guid2] = dict()
116 if not connector_type_name2 in self._connect[guid2]:
117 self._connect[guid2][connector_type_name2] = dict()
118 self._connect[guid2][connector_type_name2][guid1] = \
121 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
122 cross_testbed_guid, cross_testbed_id, cross_factory_id,
123 cross_connector_type_name):
124 self._validate_guid(guid)
125 factory = self._get_factory(guid)
126 connector_type = factory.connector_type(connector_type_name)
127 connector_type.can_connect(cross_testbed_id, cross_factory_id,
128 cross_connector_type_name, True)
129 self._validate_connection(guid, connector_type_name, cross_guid,
130 cross_connector_type_name)
132 if not guid in self._cross_connect:
133 self._cross_connect[guid] = dict()
134 if not connector_type_name in self._cross_connect[guid]:
135 self._cross_connect[guid][connector_type_name] = dict()
136 self._cross_connect[guid][connector_type_name] = \
137 (cross_guid, cross_testbed_guid, cross_testbed_id,
138 cross_factory_id, cross_connector_type_name)
140 def defer_add_trace(self, guid, trace_name):
141 self._validate_guid(guid)
142 self._validate_trace(guid, trace_name)
143 if not guid in self._add_trace:
144 self._add_trace[guid] = list()
145 self._add_trace[guid].append(trace_name)
147 def defer_add_address(self, guid, address, netprefix, broadcast):
148 self._validate_guid(guid)
149 self._validate_allow_addresses(guid)
150 if guid not in self._add_address:
151 self._add_address[guid] = list()
152 self._add_address[guid].append((address, netprefix, broadcast))
154 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
155 self._validate_guid(guid)
156 self._validate_allow_routes(guid)
157 if not guid in self._add_route:
158 self._add_route[guid] = list()
159 self._add_route[guid].append((destination, netprefix, nexthop, metric))
162 self._root_directory = self._attributes.\
163 get_attribute_value("rootDirectory")
164 self._status = TS.STATUS_SETUP
167 def set_params(self, guid):
168 parameters = self._get_parameters(guid)
169 for name, value in parameters.iteritems():
170 self.set(guid, name, value)
172 self._do_in_factory_order(
174 self._metadata.create_order,
175 postaction = set_params )
176 self._status = TS.STATUS_CREATED
178 def _do_connect(self, init = True):
179 unconnected = copy.deepcopy(self._connect)
182 for guid1, connections in unconnected.items():
183 factory1 = self._get_factory(guid1)
184 for connector_type_name1, connections2 in connections.items():
185 connector_type1 = factory1.connector_type(connector_type_name1)
186 for guid2, connector_type_name2 in connections2.items():
187 factory_id2 = self._create[guid2]
188 # Connections are executed in a "From -> To" direction only
189 # This explicitly ignores the "To -> From" (mirror)
190 # connections of every connection pair.
192 connect_code = connector_type1.connect_to_init_code(
193 self._testbed_id, factory_id2,
194 connector_type_name2,
197 connect_code = connector_type1.connect_to_compl_code(
198 self._testbed_id, factory_id2,
199 connector_type_name2,
203 delay = connect_code(self, guid1, guid2)
205 if delay is not CONNECTION_DELAY:
206 del unconnected[guid1][connector_type_name1][guid2]
207 if not unconnected[guid1][connector_type_name1]:
208 del unconnected[guid1][connector_type_name1]
209 if not unconnected[guid1]:
210 del unconnected[guid1]
212 def do_connect_init(self):
215 def do_connect_compl(self):
216 self._do_connect(init = False)
217 self._status = TS.STATUS_CONNECTED
219 def _do_in_factory_order(self, action, order, postaction = None, poststep = None):
220 logger = self._logger
222 guids = collections.defaultdict(list)
223 # order guids (elements) according to factory_id
224 for guid, factory_id in self._create.iteritems():
225 guids[factory_id].append(guid)
227 # configure elements following the factory_id order
228 for factory_id in order:
229 # Create a parallel runner if we're given a Parallel() wrapper
231 if isinstance(factory_id, Parallel):
232 runner = ParallelRun(factory_id.maxthreads)
233 factory_id = factory_id.factory
235 # omit the factories that have no element to create
236 if factory_id not in guids:
240 factory = self._factories[factory_id]
241 if isinstance(action, basestring) and not getattr(factory, action):
243 def perform_action(guid):
244 if isinstance(action, basestring):
245 getattr(factory, action)(self, guid)
249 postaction(self, guid)
251 # perform the action on all elements, in parallel if so requested
253 logger.debug("TestbedController: Starting parallel %s", action)
256 for guid in guids[factory_id]:
258 logger.debug("TestbedController: Scheduling %s on %s", action, guid)
259 runner.put(perform_action, guid)
261 logger.debug("TestbedController: Performing %s on %s", action, guid)
270 for guid in guids[factory_id]:
272 logger.debug("TestbedController: Scheduling post-%s on %s", action, guid)
273 runner.put(poststep, self, guid)
275 logger.debug("TestbedController: Performing post-%s on %s", action, guid)
281 logger.debug("TestbedController: Finished parallel %s", action)
284 def do_poststep_preconfigure(self, guid):
285 # dummy hook for implementations interested in
286 # two-phase configuration
289 def do_preconfigure(self):
290 self._do_in_factory_order(
291 'preconfigure_function',
292 self._metadata.preconfigure_order,
293 poststep = self.do_poststep_preconfigure )
296 def do_poststep_configure(self, guid):
297 # dummy hook for implementations interested in
298 # two-phase configuration
301 def do_configure(self):
302 self._do_in_factory_order(
303 'configure_function',
304 self._metadata.configure_order,
305 poststep = self.do_poststep_configure )
306 self._status = TS.STATUS_CONFIGURED
308 def do_prestart(self):
309 self._do_in_factory_order(
311 self._metadata.prestart_order )
313 def _do_cross_connect(self, cross_data, init = True):
314 for guid, cross_connections in self._cross_connect.iteritems():
315 factory = self._get_factory(guid)
316 for connector_type_name, cross_connection in \
317 cross_connections.iteritems():
318 connector_type = factory.connector_type(connector_type_name)
319 (cross_guid, cross_testbed_guid, cross_testbed_id,
320 cross_factory_id, cross_connector_type_name) = cross_connection
322 connect_code = connector_type.connect_to_init_code(
323 cross_testbed_id, cross_factory_id,
324 cross_connector_type_name,
327 connect_code = connector_type.connect_to_compl_code(
328 cross_testbed_id, cross_factory_id,
329 cross_connector_type_name,
332 if hasattr(connect_code, "func"):
333 func_name = connect_code.func.__name__
334 elif hasattr(connect_code, "__name__"):
335 func_name = connect_code.__name__
337 func_name = repr(connect_code)
338 self._logger.debug("Cross-connect - guid: %d, connect_code: %s " % (
340 elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
341 connect_code(self, guid, elem_cross_data)
343 def do_cross_connect_init(self, cross_data):
344 self._do_cross_connect(cross_data)
346 def do_cross_connect_compl(self, cross_data):
347 self._do_cross_connect(cross_data, init = False)
348 self._status = TS.STATUS_CROSS_CONNECTED
350 def set(self, guid, name, value, time = TIME_NOW):
351 self._validate_guid(guid)
352 self._validate_box_attribute(guid, name)
353 self._validate_box_value(guid, name, value)
354 self._validate_modify_box_value(guid, name)
355 if guid not in self._set:
356 self._set[guid] = dict()
357 self._setlog[guid] = dict()
358 if time not in self._setlog[guid]:
359 self._setlog[guid][time] = dict()
360 self._setlog[guid][time][name] = value
361 self._set[guid][name] = value
363 def get(self, guid, name, time = TIME_NOW):
365 gets an attribute from box definitions if available.
366 Throws KeyError if the GUID wasn't created
367 through the defer_create interface, and AttributeError if the
368 attribute isn't available (doesn't exist or is design-only)
370 self._validate_guid(guid)
371 self._validate_box_attribute(guid, name)
372 if guid in self._set and name in self._set[guid]:
373 return self._set[guid][name]
374 if guid in self._create_set and name in self._create_set[guid]:
375 return self._create_set[guid][name]
376 # if nothing else found, returns the factory default value
377 factory = self._get_factory(guid)
378 return factory.box_attributes.get_attribute_value(name)
380 def get_route(self, guid, index, attribute):
382 returns information given to defer_add_route.
384 Raises AttributeError if an invalid attribute is requested
385 or if the indexed routing rule does not exist.
387 Raises KeyError if the GUID has not been seen by
390 ATTRIBUTES = ['Destination', 'NetPrefix', 'NextHop']
392 if attribute not in ATTRIBUTES:
393 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
395 attribute_index = ATTRIBUTES.index(attribute)
397 routes = self._add_route.get(guid)
399 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
402 if not (0 <= index < len(addresses)):
403 raise AttributeError, "GUID %r at %s does not have a routing entry #%s" % (
404 guid, self._testbed_id, index)
406 return routes[index][attribute_index]
408 def get_address(self, guid, index, attribute='Address'):
410 returns information given to defer_add_address
412 Raises AttributeError if an invalid attribute is requested
413 or if the indexed routing rule does not exist.
415 Raises KeyError if the GUID has not been seen by
418 ATTRIBUTES = ['Address', 'NetPrefix', 'Broadcast']
420 if attribute not in ATTRIBUTES:
421 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
423 attribute_index = ATTRIBUTES.index(attribute)
425 addresses = self._add_address.get(guid)
427 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
430 if not (0 <= index < len(addresses)):
431 raise AttributeError, "GUID %r at %s does not have an address #%s" % (
432 guid, self._testbed_id, index)
434 return addresses[index][attribute_index]
436 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
437 factory = self._get_factory(guid)
438 attribute_list = list()
439 return factory.box_attributes.get_attribute_list(filter_flags, exclude)
441 def get_factory_id(self, guid):
442 factory = self._get_factory(guid)
443 return factory.factory_id
445 def start(self, time = TIME_NOW):
446 self._do_in_factory_order(
448 self._metadata.start_order )
449 self._status = TS.STATUS_STARTED
451 #action: NotImplementedError
453 def stop(self, time = TIME_NOW):
454 self._do_in_factory_order(
456 reversed(self._metadata.start_order) )
457 self._status = TS.STATUS_STOPPED
459 def status(self, guid = None):
462 self._validate_guid(guid)
463 factory = self._get_factory(guid)
464 status_function = factory.status_function
466 return status_function(self, guid)
467 return AS.STATUS_UNDETERMINED
469 def testbed_status(self):
472 def trace(self, guid, trace_id, attribute='value'):
473 if attribute == 'value':
474 fd = open("%s" % self.trace_filepath(guid, trace_id), "r")
477 elif attribute == 'path':
478 content = self.trace_filepath(guid, trace_id)
479 elif attribute == 'filename':
480 content = self.trace_filename(guid, trace_id)
485 def traces_info(self):
487 host = self._attributes.get_attribute_value("deployment_host")
488 user = self._attributes.get_attribute_value("deployment_user")
489 for guid, trace_list in self._add_trace.iteritems():
490 traces_info[guid] = dict()
491 for trace_id in trace_list:
492 traces_info[guid][trace_id] = dict()
493 filepath = self.trace(guid, trace_id, attribute = "path")
494 traces_info[guid][trace_id]["host"] = host
495 traces_info[guid][trace_id]["user"] = user
496 traces_info[guid][trace_id]["filepath"] = filepath
499 def trace_filepath(self, guid, trace_id):
501 Return a trace's file path, for TestbedController's default
502 implementation of trace()
504 raise NotImplementedError
506 def trace_filename(self, guid, trace_id):
508 Return a trace's file name, for TestbedController's default
509 implementation of trace()
511 raise NotImplementedError
513 #shutdown: NotImplementedError
515 def get_connected(self, guid, connector_type_name,
516 other_connector_type_name):
517 """searchs the connected elements for the specific connector_type_name
519 if guid not in self._connect:
521 # all connections for all connectors for guid
522 all_connections = self._connect[guid]
523 if connector_type_name not in all_connections:
525 # all connections for the specific connector
526 connections = all_connections[connector_type_name]
527 specific_connections = [otr_guid for otr_guid, otr_connector_type_name \
528 in connections.iteritems() if \
529 otr_connector_type_name == other_connector_type_name]
530 return specific_connections
532 def _get_connection_count(self, guid, connection_type_name):
535 if guid in self._connect and connection_type_name in \
537 count = len(self._connect[guid][connection_type_name])
538 if guid in self._cross_connect and connection_type_name in \
539 self._cross_connect[guid]:
540 cross_count = len(self._cross_connect[guid][connection_type_name])
541 return count + cross_count
543 def _get_traces(self, guid):
544 return [] if guid not in self._add_trace else self._add_trace[guid]
546 def _get_parameters(self, guid):
547 return dict() if guid not in self._create_set else \
548 self._create_set[guid]
550 def _get_factory(self, guid):
551 factory_id = self._create[guid]
552 return self._factories[factory_id]
554 def _get_factory_id(self, guid):
555 """ Returns the factory ID of the (perhaps not yet) created object """
556 return self._create.get(guid, None)
558 def _validate_guid(self, guid):
559 if not guid in self._create:
560 raise RuntimeError("Element guid %d doesn't exist" % guid)
562 def _validate_not_guid(self, guid):
563 if guid in self._create:
564 raise AttributeError("Cannot add elements with the same guid: %d" %
567 def _validate_factory_id(self, factory_id):
568 if factory_id not in self._factories:
569 raise AttributeError("Invalid element type %s for testbed version %s" %
570 (factory_id, self._testbed_version))
572 def _validate_testbed_attribute(self, name):
573 if not self._attributes.has_attribute(name):
574 raise AttributeError("Invalid testbed attribute %s for testbed" % \
577 def _validate_testbed_value(self, name, value):
578 if not self._attributes.is_attribute_value_valid(name, value):
579 raise AttributeError("Invalid value %r for testbed attribute %s" % \
582 def _validate_box_attribute(self, guid, name):
583 factory = self._get_factory(guid)
584 if not factory.box_attributes.has_attribute(name):
585 raise AttributeError("Invalid attribute %s for element type %s" %
586 (name, factory.factory_id))
588 def _validate_box_value(self, guid, name, value):
589 factory = self._get_factory(guid)
590 if not factory.box_attributes.is_attribute_value_valid(name, value):
591 raise AttributeError("Invalid value %r for attribute %s" % \
594 def _validate_factory_attribute(self, guid, name):
595 factory = self._get_factory(guid)
596 if not factory.has_attribute(name):
597 raise AttributeError("Invalid attribute %s for element type %s" %
598 (name, factory.factory_id))
600 def _validate_factory_value(self, guid, name, value):
601 factory = self._get_factory(guid)
602 if not factory.is_attribute_value_valid(name, value):
603 raise AttributeError("Invalid value %r for attribute %s" % \
606 def _validate_trace(self, guid, trace_name):
607 factory = self._get_factory(guid)
608 if not trace_name in factory.traces_list:
609 raise RuntimeError("Element type '%s' has no trace '%s'" %
610 (factory.factory_id, trace_name))
612 def _validate_allow_addresses(self, guid):
613 factory = self._get_factory(guid)
614 if not factory.allow_addresses:
615 raise RuntimeError("Element type '%s' doesn't support addresses" %
617 attr_name = "maxAddresses"
618 if guid in self._create_set and attr_name in self._create_set[guid]:
619 max_addresses = self._create_set[guid][attr_name]
621 factory = self._get_factory(guid)
622 max_addresses = factory.box_attributes.get_attribute_value(attr_name)
623 if guid in self._add_address:
624 count_addresses = len(self._add_address[guid])
625 if max_addresses == count_addresses:
626 raise RuntimeError("Element guid %d of type '%s' can't accept \
627 more addresses" % (guid, factory.factory_id))
629 def _validate_allow_routes(self, guid):
630 factory = self._get_factory(guid)
631 if not factory.allow_routes:
632 raise RuntimeError("Element type '%s' doesn't support routes" %
635 def _validate_connection(self, guid1, connector_type_name1, guid2,
636 connector_type_name2, cross = False):
637 # can't connect with self
639 raise AttributeError("Can't connect guid %d to self" % \
641 # the connection is already done, so ignore
642 connected = self.get_connected(guid1, connector_type_name1,
643 connector_type_name2)
644 if guid2 in connected:
646 count1 = self._get_connection_count(guid1, connector_type_name1)
647 factory1 = self._get_factory(guid1)
648 connector_type1 = factory1.connector_type(connector_type_name1)
649 if count1 == connector_type1.max:
650 raise AttributeError("Connector %s is full for guid %d" % \
651 (connector_type_name1, guid1))
653 def _validate_modify_box_value(self, guid, name):
654 factory = self._get_factory(guid)
655 if self._status > TS.STATUS_STARTED and \
656 (factory.box_attributes.is_attribute_exec_read_only(name) or \
657 factory.box_attributes.is_attribute_exec_immutable(name)):
658 raise AttributeError("Attribute %s can only be modified during experiment design" % name)