1 # -*- coding: utf-8 -*-
3 from nepi.core import execute
4 from nepi.core.metadata import Metadata, Parallel
5 from nepi.util import validation
6 from nepi.util.constants import TIME_NOW, \
7 ApplicationStatus as AS, \
10 from nepi.util.parallel import ParallelRun
16 class TestbedController(execute.TestbedController):
17 def __init__(self, testbed_id, testbed_version):
18 super(TestbedController, self).__init__(testbed_id, testbed_version)
19 self._status = TS.STATUS_ZERO
20 # testbed attributes for validation
21 self._attributes = None
22 # element factories for validation
23 self._factories = dict()
25 # experiment construction instructions
27 self._create_set = dict()
28 self._factory_set = dict()
29 self._connect = dict()
30 self._cross_connect = dict()
31 self._add_trace = dict()
32 self._add_address = dict()
33 self._add_route = dict()
34 self._configure = dict()
36 # log of set operations
41 # testbed element instances
42 self._elements = dict()
44 self._metadata = Metadata(self._testbed_id)
45 if self._metadata.testbed_version != testbed_version:
46 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
47 (testbed_id, testbed_version, self._metadata.testbed_version))
48 for factory in self._metadata.build_factories():
49 self._factories[factory.factory_id] = factory
50 self._attributes = self._metadata.testbed_attributes()
51 self._root_directory = None
54 self._logger = logging.getLogger("nepi.core.testbed_impl")
57 def root_directory(self):
58 return self._root_directory
62 return self._create.keys()
68 def defer_configure(self, name, value):
69 self._validate_testbed_attribute(name)
70 self._validate_testbed_value(name, value)
71 self._attributes.set_attribute_value(name, value)
72 self._configure[name] = value
74 def defer_create(self, guid, factory_id):
75 self._validate_factory_id(factory_id)
76 self._validate_not_guid(guid)
77 self._create[guid] = factory_id
79 def defer_create_set(self, guid, name, value):
80 self._validate_guid(guid)
81 self._validate_box_attribute(guid, name)
82 self._validate_box_value(guid, name, value)
83 if guid not in self._create_set:
84 self._create_set[guid] = dict()
85 self._create_set[guid][name] = value
87 def defer_factory_set(self, guid, name, value):
88 self._validate_guid(guid)
89 self._validate_factory_attribute(guid, name)
90 self._validate_factory_value(guid, name, value)
91 if guid not in self._factory_set:
92 self._factory_set[guid] = dict()
93 self._factory_set[guid][name] = value
95 def defer_connect(self, guid1, connector_type_name1, guid2,
96 connector_type_name2):
97 self._validate_guid(guid1)
98 self._validate_guid(guid2)
99 factory1 = self._get_factory(guid1)
100 factory_id2 = self._create[guid2]
101 connector_type = factory1.connector_type(connector_type_name1)
102 connector_type.can_connect(self._testbed_id, factory_id2,
103 connector_type_name2, False)
104 self._validate_connection(guid1, connector_type_name1, guid2,
105 connector_type_name2)
107 if not guid1 in self._connect:
108 self._connect[guid1] = dict()
109 if not connector_type_name1 in self._connect[guid1]:
110 self._connect[guid1][connector_type_name1] = dict()
111 self._connect[guid1][connector_type_name1][guid2] = \
113 if not guid2 in self._connect:
114 self._connect[guid2] = dict()
115 if not connector_type_name2 in self._connect[guid2]:
116 self._connect[guid2][connector_type_name2] = dict()
117 self._connect[guid2][connector_type_name2][guid1] = \
120 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
121 cross_testbed_guid, cross_testbed_id, cross_factory_id,
122 cross_connector_type_name):
123 self._validate_guid(guid)
124 factory = self._get_factory(guid)
125 connector_type = factory.connector_type(connector_type_name)
126 connector_type.can_connect(cross_testbed_id, cross_factory_id,
127 cross_connector_type_name, True)
128 self._validate_connection(guid, connector_type_name, cross_guid,
129 cross_connector_type_name)
131 if not guid in self._cross_connect:
132 self._cross_connect[guid] = dict()
133 if not connector_type_name in self._cross_connect[guid]:
134 self._cross_connect[guid][connector_type_name] = dict()
135 self._cross_connect[guid][connector_type_name] = \
136 (cross_guid, cross_testbed_guid, cross_testbed_id,
137 cross_factory_id, cross_connector_type_name)
139 def defer_add_trace(self, guid, trace_name):
140 self._validate_guid(guid)
141 self._validate_trace(guid, trace_name)
142 if not guid in self._add_trace:
143 self._add_trace[guid] = list()
144 self._add_trace[guid].append(trace_name)
146 def defer_add_address(self, guid, address, netprefix, broadcast):
147 self._validate_guid(guid)
148 self._validate_allow_addresses(guid)
149 if guid not in self._add_address:
150 self._add_address[guid] = list()
151 self._add_address[guid].append((address, netprefix, broadcast))
153 def defer_add_route(self, guid, destination, netprefix, nexthop,
154 metric = 0, device = None):
155 self._validate_guid(guid)
156 self._validate_allow_routes(guid)
157 if not guid in self._add_route:
158 self._add_route[guid] = list()
159 self._add_route[guid].append((destination, netprefix, nexthop,
163 self._root_directory = self._attributes.\
164 get_attribute_value("rootDirectory")
165 self._status = TS.STATUS_SETUP
168 def set_params(self, guid):
169 parameters = self._get_parameters(guid)
170 for name, value in parameters.iteritems():
171 self.set(guid, name, value)
173 self._do_in_factory_order(
175 self._metadata.create_order,
176 postaction = set_params )
177 self._status = TS.STATUS_CREATED
179 def _do_connect(self, init = True):
180 unconnected = copy.deepcopy(self._connect)
183 for guid1, connections in unconnected.items():
184 factory1 = self._get_factory(guid1)
185 for connector_type_name1, connections2 in connections.items():
186 connector_type1 = factory1.connector_type(connector_type_name1)
187 for guid2, connector_type_name2 in connections2.items():
188 factory_id2 = self._create[guid2]
189 # Connections are executed in a "From -> To" direction only
190 # This explicitly ignores the "To -> From" (mirror)
191 # connections of every connection pair.
193 connect_code = connector_type1.connect_to_init_code(
194 self._testbed_id, factory_id2,
195 connector_type_name2,
198 connect_code = connector_type1.connect_to_compl_code(
199 self._testbed_id, factory_id2,
200 connector_type_name2,
204 delay = connect_code(self, guid1, guid2)
206 if delay is not CONNECTION_DELAY:
207 del unconnected[guid1][connector_type_name1][guid2]
208 if not unconnected[guid1][connector_type_name1]:
209 del unconnected[guid1][connector_type_name1]
210 if not unconnected[guid1]:
211 del unconnected[guid1]
213 def do_connect_init(self):
216 def do_connect_compl(self):
217 self._do_connect(init = False)
218 self._status = TS.STATUS_CONNECTED
220 def _do_in_factory_order(self, action, order, postaction = None, poststep = None):
221 logger = self._logger
223 guids = collections.defaultdict(list)
224 # order guids (elements) according to factory_id
225 for guid, factory_id in self._create.iteritems():
226 guids[factory_id].append(guid)
228 # configure elements following the factory_id order
229 for factory_id in order:
230 # Create a parallel runner if we're given a Parallel() wrapper
232 if isinstance(factory_id, Parallel):
233 runner = ParallelRun(factory_id.maxthreads)
234 factory_id = factory_id.factory
236 # omit the factories that have no element to create
237 if factory_id not in guids:
241 factory = self._factories[factory_id]
242 if isinstance(action, basestring) and not getattr(factory, action):
244 def perform_action(guid):
245 if isinstance(action, basestring):
246 getattr(factory, action)(self, guid)
250 postaction(self, guid)
252 # perform the action on all elements, in parallel if so requested
254 logger.debug("TestbedController: Starting parallel %s", action)
257 for guid in guids[factory_id]:
259 logger.debug("TestbedController: Scheduling %s on %s", action, guid)
260 runner.put(perform_action, guid)
262 logger.debug("TestbedController: Performing %s on %s", action, guid)
271 for guid in guids[factory_id]:
273 logger.debug("TestbedController: Scheduling post-%s on %s", action, guid)
274 runner.put(poststep, self, guid)
276 logger.debug("TestbedController: Performing post-%s on %s", action, guid)
282 logger.debug("TestbedController: Finished parallel %s", action)
285 def do_poststep_preconfigure(self, guid):
286 # dummy hook for implementations interested in
287 # two-phase configuration
290 def do_preconfigure(self):
291 self._do_in_factory_order(
292 'preconfigure_function',
293 self._metadata.preconfigure_order,
294 poststep = self.do_poststep_preconfigure )
297 def do_poststep_configure(self, guid):
298 # dummy hook for implementations interested in
299 # two-phase configuration
302 def do_configure(self):
303 self._do_in_factory_order(
304 'configure_function',
305 self._metadata.configure_order,
306 poststep = self.do_poststep_configure )
307 self._status = TS.STATUS_CONFIGURED
309 def do_prestart(self):
310 self._do_in_factory_order(
312 self._metadata.prestart_order )
314 def _do_cross_connect(self, cross_data, init = True):
315 for guid, cross_connections in self._cross_connect.iteritems():
316 factory = self._get_factory(guid)
317 for connector_type_name, cross_connection in \
318 cross_connections.iteritems():
319 connector_type = factory.connector_type(connector_type_name)
320 (cross_guid, cross_testbed_guid, cross_testbed_id,
321 cross_factory_id, cross_connector_type_name) = cross_connection
323 connect_code = connector_type.connect_to_init_code(
324 cross_testbed_id, cross_factory_id,
325 cross_connector_type_name,
328 connect_code = connector_type.connect_to_compl_code(
329 cross_testbed_id, cross_factory_id,
330 cross_connector_type_name,
333 if hasattr(connect_code, "func"):
334 func_name = connect_code.func.__name__
335 elif hasattr(connect_code, "__name__"):
336 func_name = connect_code.__name__
338 func_name = repr(connect_code)
339 self._logger.debug("Cross-connect - guid: %d, connect_code: %s " % (
341 elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
342 connect_code(self, guid, elem_cross_data)
344 def do_cross_connect_init(self, cross_data):
345 self._do_cross_connect(cross_data)
347 def do_cross_connect_compl(self, cross_data):
348 self._do_cross_connect(cross_data, init = False)
349 self._status = TS.STATUS_CROSS_CONNECTED
351 def set(self, guid, name, value, time = TIME_NOW):
352 self._validate_guid(guid)
353 self._validate_box_attribute(guid, name)
354 self._validate_box_value(guid, name, value)
355 self._validate_modify_box_value(guid, name)
356 if guid not in self._set:
357 self._set[guid] = dict()
358 self._setlog[guid] = dict()
359 if time not in self._setlog[guid]:
360 self._setlog[guid][time] = dict()
361 self._setlog[guid][time][name] = value
362 self._set[guid][name] = value
364 def get(self, guid, name, time = TIME_NOW):
366 gets an attribute from box definitions if available.
367 Throws KeyError if the GUID wasn't created
368 through the defer_create interface, and AttributeError if the
369 attribute isn't available (doesn't exist or is design-only)
371 self._validate_guid(guid)
372 self._validate_box_attribute(guid, name)
373 if guid in self._set and name in self._set[guid]:
374 return self._set[guid][name]
375 if guid in self._create_set and name in self._create_set[guid]:
376 return self._create_set[guid][name]
377 # if nothing else found, returns the factory default value
378 factory = self._get_factory(guid)
379 return factory.box_attributes.get_attribute_value(name)
381 def get_route(self, guid, index, attribute):
383 returns information given to defer_add_route.
385 Raises AttributeError if an invalid attribute is requested
386 or if the indexed routing rule does not exist.
388 Raises KeyError if the GUID has not been seen by
391 ATTRIBUTES = ['Destination', 'NetPrefix', 'NextHop']
393 if attribute not in ATTRIBUTES:
394 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
396 attribute_index = ATTRIBUTES.index(attribute)
398 routes = self._add_route.get(guid)
400 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
403 if not (0 <= index < len(addresses)):
404 raise AttributeError, "GUID %r at %s does not have a routing entry #%s" % (
405 guid, self._testbed_id, index)
407 return routes[index][attribute_index]
409 def get_address(self, guid, index, attribute='Address'):
411 returns information given to defer_add_address
413 Raises AttributeError if an invalid attribute is requested
414 or if the indexed routing rule does not exist.
416 Raises KeyError if the GUID has not been seen by
419 ATTRIBUTES = ['Address', 'NetPrefix', 'Broadcast']
421 if attribute not in ATTRIBUTES:
422 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
424 attribute_index = ATTRIBUTES.index(attribute)
426 addresses = self._add_address.get(guid)
428 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
431 if not (0 <= index < len(addresses)):
432 raise AttributeError, "GUID %r at %s does not have an address #%s" % (
433 guid, self._testbed_id, index)
435 return addresses[index][attribute_index]
437 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
438 factory = self._get_factory(guid)
439 attribute_list = list()
440 return factory.box_attributes.get_attribute_list(filter_flags, exclude)
442 def get_factory_id(self, guid):
443 factory = self._get_factory(guid)
444 return factory.factory_id
446 def start(self, time = TIME_NOW):
447 self._do_in_factory_order(
449 self._metadata.start_order )
450 self._status = TS.STATUS_STARTED
452 #action: NotImplementedError
454 def stop(self, time = TIME_NOW):
455 self._do_in_factory_order(
457 reversed(self._metadata.start_order) )
458 self._status = TS.STATUS_STOPPED
460 def status(self, guid = None):
463 self._validate_guid(guid)
464 factory = self._get_factory(guid)
465 status_function = factory.status_function
467 return status_function(self, guid)
468 return AS.STATUS_UNDETERMINED
470 def testbed_status(self):
473 def trace(self, guid, trace_id, attribute='value'):
474 if attribute == 'value':
475 fd = open("%s" % self.trace_filepath(guid, trace_id), "r")
478 elif attribute == 'path':
479 content = self.trace_filepath(guid, trace_id)
480 elif attribute == 'filename':
481 content = self.trace_filename(guid, trace_id)
486 def traces_info(self):
488 host = self._attributes.get_attribute_value("deployment_host")
489 user = self._attributes.get_attribute_value("deployment_user")
490 for guid, trace_list in self._add_trace.iteritems():
491 traces_info[guid] = dict()
492 for trace_id in trace_list:
493 traces_info[guid][trace_id] = dict()
494 filepath = self.trace(guid, trace_id, attribute = "path")
495 traces_info[guid][trace_id]["host"] = host
496 traces_info[guid][trace_id]["user"] = user
497 traces_info[guid][trace_id]["filepath"] = filepath
500 def trace_filepath(self, guid, trace_id):
502 Return a trace's file path, for TestbedController's default
503 implementation of trace()
505 raise NotImplementedError
507 def trace_filename(self, guid, trace_id):
509 Return a trace's file name, for TestbedController's default
510 implementation of trace()
512 raise NotImplementedError
514 #shutdown: NotImplementedError
516 def get_connected(self, guid, connector_type_name,
517 other_connector_type_name):
518 """searchs the connected elements for the specific connector_type_name
520 if guid not in self._connect:
522 # all connections for all connectors for guid
523 all_connections = self._connect[guid]
524 if connector_type_name not in all_connections:
526 # all connections for the specific connector
527 connections = all_connections[connector_type_name]
528 specific_connections = [otr_guid for otr_guid, otr_connector_type_name \
529 in connections.iteritems() if \
530 otr_connector_type_name == other_connector_type_name]
531 return specific_connections
533 def _get_connection_count(self, guid, connection_type_name):
536 if guid in self._connect and connection_type_name in \
538 count = len(self._connect[guid][connection_type_name])
539 if guid in self._cross_connect and connection_type_name in \
540 self._cross_connect[guid]:
541 cross_count = len(self._cross_connect[guid][connection_type_name])
542 return count + cross_count
544 def _get_traces(self, guid):
545 return [] if guid not in self._add_trace else self._add_trace[guid]
547 def _get_parameters(self, guid):
548 return dict() if guid not in self._create_set else \
549 self._create_set[guid]
551 def _get_factory(self, guid):
552 factory_id = self._create[guid]
553 return self._factories[factory_id]
555 def _get_factory_id(self, guid):
556 """ Returns the factory ID of the (perhaps not yet) created object """
557 return self._create.get(guid, None)
559 def _validate_guid(self, guid):
560 if not guid in self._create:
561 raise RuntimeError("Element guid %d doesn't exist" % guid)
563 def _validate_not_guid(self, guid):
564 if guid in self._create:
565 raise AttributeError("Cannot add elements with the same guid: %d" %
568 def _validate_factory_id(self, factory_id):
569 if factory_id not in self._factories:
570 raise AttributeError("Invalid element type %s for testbed version %s" %
571 (factory_id, self._testbed_version))
573 def _validate_testbed_attribute(self, name):
574 if not self._attributes.has_attribute(name):
575 raise AttributeError("Invalid testbed attribute %s for testbed" % \
578 def _validate_testbed_value(self, name, value):
579 if not self._attributes.is_attribute_value_valid(name, value):
580 raise AttributeError("Invalid value %r for testbed attribute %s" % \
583 def _validate_box_attribute(self, guid, name):
584 factory = self._get_factory(guid)
585 if not factory.box_attributes.has_attribute(name):
586 raise AttributeError("Invalid attribute %s for element type %s" %
587 (name, factory.factory_id))
589 def _validate_box_value(self, guid, name, value):
590 factory = self._get_factory(guid)
591 if not factory.box_attributes.is_attribute_value_valid(name, value):
592 raise AttributeError("Invalid value %r for attribute %s" % \
595 def _validate_factory_attribute(self, guid, name):
596 factory = self._get_factory(guid)
597 if not factory.has_attribute(name):
598 raise AttributeError("Invalid attribute %s for element type %s" %
599 (name, factory.factory_id))
601 def _validate_factory_value(self, guid, name, value):
602 factory = self._get_factory(guid)
603 if not factory.is_attribute_value_valid(name, value):
604 raise AttributeError("Invalid value %r for attribute %s" % \
607 def _validate_trace(self, guid, trace_name):
608 factory = self._get_factory(guid)
609 if not trace_name in factory.traces_list:
610 raise RuntimeError("Element type '%s' has no trace '%s'" %
611 (factory.factory_id, trace_name))
613 def _validate_allow_addresses(self, guid):
614 factory = self._get_factory(guid)
615 if not factory.allow_addresses:
616 raise RuntimeError("Element type '%s' doesn't support addresses" %
618 attr_name = "maxAddresses"
619 if guid in self._create_set and attr_name in self._create_set[guid]:
620 max_addresses = self._create_set[guid][attr_name]
622 factory = self._get_factory(guid)
623 max_addresses = factory.box_attributes.get_attribute_value(attr_name)
624 if guid in self._add_address:
625 count_addresses = len(self._add_address[guid])
626 if max_addresses == count_addresses:
627 raise RuntimeError("Element guid %d of type '%s' can't accept \
628 more addresses" % (guid, factory.factory_id))
630 def _validate_allow_routes(self, guid):
631 factory = self._get_factory(guid)
632 if not factory.allow_routes:
633 raise RuntimeError("Element type '%s' doesn't support routes" %
636 def _validate_connection(self, guid1, connector_type_name1, guid2,
637 connector_type_name2, cross = False):
638 # can't connect with self
640 raise AttributeError("Can't connect guid %d to self" % \
642 # the connection is already done, so ignore
643 connected = self.get_connected(guid1, connector_type_name1,
644 connector_type_name2)
645 if guid2 in connected:
647 count1 = self._get_connection_count(guid1, connector_type_name1)
648 factory1 = self._get_factory(guid1)
649 connector_type1 = factory1.connector_type(connector_type_name1)
650 if count1 == connector_type1.max:
651 raise AttributeError("Connector %s is full for guid %d" % \
652 (connector_type_name1, guid1))
654 def _validate_modify_box_value(self, guid, name):
655 factory = self._get_factory(guid)
656 if self._status > TS.STATUS_STARTED and \
657 (factory.box_attributes.is_attribute_exec_read_only(name) or \
658 factory.box_attributes.is_attribute_exec_immutable(name)):
659 raise AttributeError("Attribute %s can only be modified during experiment design" % name)