2 # -*- coding: utf-8 -*-
4 from nepi.core import execute
5 from nepi.core.metadata import Metadata, Parallel
6 from nepi.util import validation
7 from nepi.util.constants import TIME_NOW, \
8 ApplicationStatus as AS, \
11 from nepi.util.parallel import ParallelRun
17 class TestbedController(execute.TestbedController):
18 def __init__(self, testbed_id, testbed_version):
19 super(TestbedController, self).__init__(testbed_id, testbed_version)
20 self._status = TS.STATUS_ZERO
21 # testbed attributes for validation
22 self._attributes = None
23 # element factories for validation
24 self._factories = dict()
26 # experiment construction instructions
28 self._create_set = dict()
29 self._factory_set = dict()
30 self._connect = dict()
31 self._cross_connect = dict()
32 self._add_trace = dict()
33 self._add_address = dict()
34 self._add_route = dict()
35 self._configure = dict()
37 # log of set operations
42 # testbed element instances
43 self._elements = dict()
45 self._metadata = Metadata(self._testbed_id)
46 if self._metadata.testbed_version != testbed_version:
47 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
48 (testbed_id, testbed_version, self._metadata.testbed_version))
49 for factory in self._metadata.build_factories():
50 self._factories[factory.factory_id] = factory
51 self._attributes = self._metadata.testbed_attributes()
52 self._root_directory = None
55 self._logger = logging.getLogger("nepi.core.testbed_impl")
58 def root_directory(self):
59 return self._root_directory
63 return self._create.keys()
69 def defer_configure(self, name, value):
70 self._validate_testbed_attribute(name)
71 self._validate_testbed_value(name, value)
72 self._attributes.set_attribute_value(name, value)
73 self._configure[name] = value
75 def defer_create(self, guid, factory_id):
76 self._validate_factory_id(factory_id)
77 self._validate_not_guid(guid)
78 self._create[guid] = factory_id
80 def defer_create_set(self, guid, name, value):
81 self._validate_guid(guid)
82 self._validate_box_attribute(guid, name)
83 self._validate_box_value(guid, name, value)
84 if guid not in self._create_set:
85 self._create_set[guid] = dict()
86 self._create_set[guid][name] = value
88 def defer_factory_set(self, guid, name, value):
89 self._validate_guid(guid)
90 self._validate_factory_attribute(guid, name)
91 self._validate_factory_value(guid, name, value)
92 if guid not in self._factory_set:
93 self._factory_set[guid] = dict()
94 self._factory_set[guid][name] = value
96 def defer_connect(self, guid1, connector_type_name1, guid2,
97 connector_type_name2):
98 self._validate_guid(guid1)
99 self._validate_guid(guid2)
100 factory1 = self._get_factory(guid1)
101 factory_id2 = self._create[guid2]
102 connector_type = factory1.connector_type(connector_type_name1)
103 connector_type.can_connect(self._testbed_id, factory_id2,
104 connector_type_name2, False)
105 self._validate_connection(guid1, connector_type_name1, guid2,
106 connector_type_name2)
108 if not guid1 in self._connect:
109 self._connect[guid1] = dict()
110 if not connector_type_name1 in self._connect[guid1]:
111 self._connect[guid1][connector_type_name1] = dict()
112 self._connect[guid1][connector_type_name1][guid2] = \
114 if not guid2 in self._connect:
115 self._connect[guid2] = dict()
116 if not connector_type_name2 in self._connect[guid2]:
117 self._connect[guid2][connector_type_name2] = dict()
118 self._connect[guid2][connector_type_name2][guid1] = \
121 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
122 cross_testbed_guid, cross_testbed_id, cross_factory_id,
123 cross_connector_type_name):
124 self._validate_guid(guid)
125 factory = self._get_factory(guid)
126 connector_type = factory.connector_type(connector_type_name)
127 connector_type.can_connect(cross_testbed_id, cross_factory_id,
128 cross_connector_type_name, True)
129 self._validate_connection(guid, connector_type_name, cross_guid,
130 cross_connector_type_name)
132 if not guid in self._cross_connect:
133 self._cross_connect[guid] = dict()
134 if not connector_type_name in self._cross_connect[guid]:
135 self._cross_connect[guid][connector_type_name] = dict()
136 self._cross_connect[guid][connector_type_name] = \
137 (cross_guid, cross_testbed_guid, cross_testbed_id,
138 cross_factory_id, cross_connector_type_name)
140 def defer_add_trace(self, guid, trace_name):
141 self._validate_guid(guid)
142 self._validate_trace(guid, trace_name)
143 if not guid in self._add_trace:
144 self._add_trace[guid] = list()
145 self._add_trace[guid].append(trace_name)
147 def defer_add_address(self, guid, address, netprefix, broadcast):
148 self._validate_guid(guid)
149 self._validate_allow_addresses(guid)
150 if guid not in self._add_address:
151 self._add_address[guid] = list()
152 self._add_address[guid].append((address, netprefix, broadcast))
154 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
155 self._validate_guid(guid)
156 self._validate_allow_routes(guid)
157 if not guid in self._add_route:
158 self._add_route[guid] = list()
159 self._add_route[guid].append((destination, netprefix, nexthop, metric))
162 self._root_directory = self._attributes.\
163 get_attribute_value("rootDirectory")
164 self._status = TS.STATUS_SETUP
167 def set_params(self, guid):
168 parameters = self._get_parameters(guid)
169 for name, value in parameters.iteritems():
170 self.set(guid, name, value)
172 self._do_in_factory_order(
174 self._metadata.create_order,
175 postaction = set_params )
176 self._status = TS.STATUS_CREATED
178 def _do_connect(self, init = True):
179 unconnected = copy.deepcopy(self._connect)
182 for guid1, connections in unconnected.items():
183 factory1 = self._get_factory(guid1)
184 for connector_type_name1, connections2 in connections.items():
185 connector_type1 = factory1.connector_type(connector_type_name1)
186 for guid2, connector_type_name2 in connections2.items():
187 factory_id2 = self._create[guid2]
188 # Connections are executed in a "From -> To" direction only
189 # This explicitly ignores the "To -> From" (mirror)
190 # connections of every connection pair.
192 connect_code = connector_type1.connect_to_init_code(
193 self._testbed_id, factory_id2,
194 connector_type_name2,
197 connect_code = connector_type1.connect_to_compl_code(
198 self._testbed_id, factory_id2,
199 connector_type_name2,
203 delay = connect_code(self, guid1, guid2)
205 if delay is not CONNECTION_DELAY:
206 del unconnected[guid1][connector_type_name1][guid2]
207 if not unconnected[guid1][connector_type_name1]:
208 del unconnected[guid1][connector_type_name1]
209 if not unconnected[guid1]:
210 del unconnected[guid1]
212 def do_connect_init(self):
215 def do_connect_compl(self):
216 self._do_connect(init = False)
217 self._status = TS.STATUS_CONNECTED
219 def _do_in_factory_order(self, action, order, postaction = None, poststep = None):
220 logger = self._logger
222 guids = collections.defaultdict(list)
223 # order guids (elements) according to factory_id
224 for guid, factory_id in self._create.iteritems():
225 guids[factory_id].append(guid)
227 # configure elements following the factory_id order
228 for factory_id in order:
229 # Create a parallel runner if we're given a Parallel() wrapper
231 if isinstance(factory_id, Parallel):
232 runner = ParallelRun(factory_id.maxthreads)
233 factory_id = factory_id.factory
235 # omit the factories that have no element to create
236 if factory_id not in guids:
240 factory = self._factories[factory_id]
241 if isinstance(action, basestring) and not getattr(factory, action):
243 def perform_action(guid):
244 if isinstance(action, basestring):
245 getattr(factory, action)(self, guid)
249 postaction(self, guid)
251 # perform the action on all elements, in parallel if so requested
253 logger.debug("TestbedController: Starting parallel %s", action)
256 for guid in guids[factory_id]:
258 logger.debug("TestbedController: Scheduling %s on %s", action, guid)
259 runner.put(perform_action, guid)
261 logger.debug("TestbedController: Performing %s on %s", action, guid)
270 for guid in guids[factory_id]:
272 logger.debug("TestbedController: Scheduling post-%s on %s", action, guid)
273 runner.put(poststep, self, guid)
275 logger.debug("TestbedController: Performing post-%s on %s", action, guid)
281 logger.debug("TestbedController: Finished parallel %s", action)
284 def do_poststep_preconfigure(self, guid):
285 # dummy hook for implementations interested in
286 # two-phase configuration
289 def do_preconfigure(self):
290 self._do_in_factory_order(
291 'preconfigure_function',
292 self._metadata.preconfigure_order,
293 poststep = self.do_poststep_preconfigure )
296 def do_poststep_configure(self, guid):
297 # dummy hook for implementations interested in
298 # two-phase configuration
301 def do_configure(self):
302 self._do_in_factory_order(
303 'configure_function',
304 self._metadata.configure_order,
305 poststep = self.do_poststep_configure )
306 self._status = TS.STATUS_CONFIGURED
308 def do_prestart(self):
309 self._do_in_factory_order(
311 self._metadata.prestart_order )
313 def _do_cross_connect(self, cross_data, init = True):
314 for guid, cross_connections in self._cross_connect.iteritems():
315 factory = self._get_factory(guid)
316 for connector_type_name, cross_connection in \
317 cross_connections.iteritems():
318 connector_type = factory.connector_type(connector_type_name)
319 (cross_guid, cross_testbed_guid, cross_testbed_id,
320 cross_factory_id, cross_connector_type_name) = cross_connection
322 connect_code = connector_type.connect_to_init_code(
323 cross_testbed_id, cross_factory_id,
324 cross_connector_type_name,
327 connect_code = connector_type.connect_to_compl_code(
328 cross_testbed_id, cross_factory_id,
329 cross_connector_type_name,
332 self._logger.debug("Cross-connect: guid: %d, connect_code: %s " % (
333 guid, connect_code.func.__name__))
334 elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
335 connect_code(self, guid, elem_cross_data)
337 def do_cross_connect_init(self, cross_data):
338 self._do_cross_connect(cross_data)
340 def do_cross_connect_compl(self, cross_data):
341 self._do_cross_connect(cross_data, init = False)
342 self._status = TS.STATUS_CROSS_CONNECTED
344 def set(self, guid, name, value, time = TIME_NOW):
345 self._validate_guid(guid)
346 self._validate_box_attribute(guid, name)
347 self._validate_box_value(guid, name, value)
348 self._validate_modify_box_value(guid, name)
349 if guid not in self._set:
350 self._set[guid] = dict()
351 self._setlog[guid] = dict()
352 if time not in self._setlog[guid]:
353 self._setlog[guid][time] = dict()
354 self._setlog[guid][time][name] = value
355 self._set[guid][name] = value
357 def get(self, guid, name, time = TIME_NOW):
359 gets an attribute from box definitions if available.
360 Throws KeyError if the GUID wasn't created
361 through the defer_create interface, and AttributeError if the
362 attribute isn't available (doesn't exist or is design-only)
364 self._validate_guid(guid)
365 self._validate_box_attribute(guid, name)
366 if guid in self._set and name in self._set[guid]:
367 return self._set[guid][name]
368 if guid in self._create_set and name in self._create_set[guid]:
369 return self._create_set[guid][name]
370 # if nothing else found, returns the factory default value
371 factory = self._get_factory(guid)
372 return factory.box_attributes.get_attribute_value(name)
374 def get_route(self, guid, index, attribute):
376 returns information given to defer_add_route.
378 Raises AttributeError if an invalid attribute is requested
379 or if the indexed routing rule does not exist.
381 Raises KeyError if the GUID has not been seen by
384 ATTRIBUTES = ['Destination', 'NetPrefix', 'NextHop']
386 if attribute not in ATTRIBUTES:
387 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
389 attribute_index = ATTRIBUTES.index(attribute)
391 routes = self._add_route.get(guid)
393 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
396 if not (0 <= index < len(addresses)):
397 raise AttributeError, "GUID %r at %s does not have a routing entry #%s" % (
398 guid, self._testbed_id, index)
400 return routes[index][attribute_index]
402 def get_address(self, guid, index, attribute='Address'):
404 returns information given to defer_add_address
406 Raises AttributeError if an invalid attribute is requested
407 or if the indexed routing rule does not exist.
409 Raises KeyError if the GUID has not been seen by
412 ATTRIBUTES = ['Address', 'NetPrefix', 'Broadcast']
414 if attribute not in ATTRIBUTES:
415 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
417 attribute_index = ATTRIBUTES.index(attribute)
419 addresses = self._add_address.get(guid)
421 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
424 if not (0 <= index < len(addresses)):
425 raise AttributeError, "GUID %r at %s does not have an address #%s" % (
426 guid, self._testbed_id, index)
428 return addresses[index][attribute_index]
430 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
431 factory = self._get_factory(guid)
432 attribute_list = list()
433 return factory.box_attributes.get_attribute_list(filter_flags, exclude)
435 def get_factory_id(self, guid):
436 factory = self._get_factory(guid)
437 return factory.factory_id
439 def start(self, time = TIME_NOW):
440 self._do_in_factory_order(
442 self._metadata.start_order )
443 self._status = TS.STATUS_STARTED
445 #action: NotImplementedError
447 def stop(self, time = TIME_NOW):
448 self._do_in_factory_order(
450 reversed(self._metadata.start_order) )
451 self._status = TS.STATUS_STOPPED
453 def status(self, guid = None):
456 self._validate_guid(guid)
457 factory = self._get_factory(guid)
458 status_function = factory.status_function
460 return status_function(self, guid)
461 return AS.STATUS_UNDETERMINED
463 def testbed_status(self):
466 def trace(self, guid, trace_id, attribute='value'):
467 if attribute == 'value':
468 fd = open("%s" % self.trace_filepath(guid, trace_id), "r")
471 elif attribute == 'path':
472 content = self.trace_filepath(guid, trace_id)
473 elif attribute == 'filename':
474 content = self.trace_filename(guid, trace_id)
479 def traces_info(self):
481 host = self._attributes.get_attribute_value("deployment_host")
482 user = self._attributes.get_attribute_value("deployment_user")
483 for guid, trace_list in self._add_trace.iteritems():
484 traces_info[guid] = dict()
485 for trace_id in trace_list:
486 traces_info[guid][trace_id] = dict()
487 filepath = self.trace(guid, trace_id, attribute = "path")
488 traces_info[guid][trace_id]["host"] = host
489 traces_info[guid][trace_id]["user"] = user
490 traces_info[guid][trace_id]["filepath"] = filepath
493 def trace_filepath(self, guid, trace_id):
495 Return a trace's file path, for TestbedController's default
496 implementation of trace()
498 raise NotImplementedError
500 def trace_filename(self, guid, trace_id):
502 Return a trace's file name, for TestbedController's default
503 implementation of trace()
505 raise NotImplementedError
507 #shutdown: NotImplementedError
509 def get_connected(self, guid, connector_type_name,
510 other_connector_type_name):
511 """searchs the connected elements for the specific connector_type_name
513 if guid not in self._connect:
515 # all connections for all connectors for guid
516 all_connections = self._connect[guid]
517 if connector_type_name not in all_connections:
519 # all connections for the specific connector
520 connections = all_connections[connector_type_name]
521 specific_connections = [otr_guid for otr_guid, otr_connector_type_name \
522 in connections.iteritems() if \
523 otr_connector_type_name == other_connector_type_name]
524 return specific_connections
526 def _get_connection_count(self, guid, connection_type_name):
529 if guid in self._connect and connection_type_name in \
531 count = len(self._connect[guid][connection_type_name])
532 if guid in self._cross_connect and connection_type_name in \
533 self._cross_connect[guid]:
534 cross_count = len(self._cross_connect[guid][connection_type_name])
535 return count + cross_count
537 def _get_traces(self, guid):
538 return [] if guid not in self._add_trace else self._add_trace[guid]
540 def _get_parameters(self, guid):
541 return dict() if guid not in self._create_set else \
542 self._create_set[guid]
544 def _get_factory(self, guid):
545 factory_id = self._create[guid]
546 return self._factories[factory_id]
548 def _get_factory_id(self, guid):
549 """ Returns the factory ID of the (perhaps not yet) created object """
550 return self._create.get(guid, None)
552 def _validate_guid(self, guid):
553 if not guid in self._create:
554 raise RuntimeError("Element guid %d doesn't exist" % guid)
556 def _validate_not_guid(self, guid):
557 if guid in self._create:
558 raise AttributeError("Cannot add elements with the same guid: %d" %
561 def _validate_factory_id(self, factory_id):
562 if factory_id not in self._factories:
563 raise AttributeError("Invalid element type %s for testbed version %s" %
564 (factory_id, self._testbed_version))
566 def _validate_testbed_attribute(self, name):
567 if not self._attributes.has_attribute(name):
568 raise AttributeError("Invalid testbed attribute %s for testbed" % \
571 def _validate_testbed_value(self, name, value):
572 if not self._attributes.is_attribute_value_valid(name, value):
573 raise AttributeError("Invalid value %r for testbed attribute %s" % \
576 def _validate_box_attribute(self, guid, name):
577 factory = self._get_factory(guid)
578 if not factory.box_attributes.has_attribute(name):
579 raise AttributeError("Invalid attribute %s for element type %s" %
580 (name, factory.factory_id))
582 def _validate_box_value(self, guid, name, value):
583 factory = self._get_factory(guid)
584 if not factory.box_attributes.is_attribute_value_valid(name, value):
585 raise AttributeError("Invalid value %r for attribute %s" % \
588 def _validate_factory_attribute(self, guid, name):
589 factory = self._get_factory(guid)
590 if not factory.has_attribute(name):
591 raise AttributeError("Invalid attribute %s for element type %s" %
592 (name, factory.factory_id))
594 def _validate_factory_value(self, guid, name, value):
595 factory = self._get_factory(guid)
596 if not factory.is_attribute_value_valid(name, value):
597 raise AttributeError("Invalid value %r for attribute %s" % \
600 def _validate_trace(self, guid, trace_name):
601 factory = self._get_factory(guid)
602 if not trace_name in factory.traces_list:
603 raise RuntimeError("Element type '%s' has no trace '%s'" %
604 (factory.factory_id, trace_name))
606 def _validate_allow_addresses(self, guid):
607 factory = self._get_factory(guid)
608 if not factory.allow_addresses:
609 raise RuntimeError("Element type '%s' doesn't support addresses" %
611 attr_name = "maxAddresses"
612 if guid in self._create_set and attr_name in self._create_set[guid]:
613 max_addresses = self._create_set[guid][attr_name]
615 factory = self._get_factory(guid)
616 max_addresses = factory.box_attributes.get_attribute_value(attr_name)
617 if guid in self._add_address:
618 count_addresses = len(self._add_address[guid])
619 if max_addresses == count_addresses:
620 raise RuntimeError("Element guid %d of type '%s' can't accept \
621 more addresses" % (guid, factory.factory_id))
623 def _validate_allow_routes(self, guid):
624 factory = self._get_factory(guid)
625 if not factory.allow_routes:
626 raise RuntimeError("Element type '%s' doesn't support routes" %
629 def _validate_connection(self, guid1, connector_type_name1, guid2,
630 connector_type_name2, cross = False):
631 # can't connect with self
633 raise AttributeError("Can't connect guid %d to self" % \
635 # the connection is already done, so ignore
636 connected = self.get_connected(guid1, connector_type_name1,
637 connector_type_name2)
638 if guid2 in connected:
640 count1 = self._get_connection_count(guid1, connector_type_name1)
641 factory1 = self._get_factory(guid1)
642 connector_type1 = factory1.connector_type(connector_type_name1)
643 if count1 == connector_type1.max:
644 raise AttributeError("Connector %s is full for guid %d" % \
645 (connector_type_name1, guid1))
647 def _validate_modify_box_value(self, guid, name):
648 factory = self._get_factory(guid)
649 if self._status > TS.STATUS_STARTED and \
650 (factory.box_attributes.is_attribute_exec_read_only(name) or \
651 factory.box_attributes.is_attribute_exec_immutable(name)):
652 raise AttributeError("Attribute %s can only be modified during experiment design" % name)