2 # -*- coding: utf-8 -*-
4 from nepi.core import execute
5 from nepi.core.metadata import Metadata, Parallel
6 from nepi.util import validation
7 from nepi.util.constants import TIME_NOW, \
8 ApplicationStatus as AS, \
11 from nepi.util.parallel import ParallelRun
16 class TestbedController(execute.TestbedController):
17 def __init__(self, testbed_id, testbed_version):
18 super(TestbedController, self).__init__(testbed_id, testbed_version)
19 self._status = TS.STATUS_ZERO
20 # testbed attributes for validation
21 self._attributes = None
22 # element factories for validation
23 self._factories = dict()
25 # experiment construction instructions
27 self._create_set = dict()
28 self._factory_set = dict()
29 self._connect = dict()
30 self._cross_connect = dict()
31 self._add_trace = dict()
32 self._add_address = dict()
33 self._add_route = dict()
34 self._configure = dict()
36 # log of set operations
41 # testbed element instances
42 self._elements = dict()
44 self._metadata = Metadata(self._testbed_id)
45 if self._metadata.testbed_version != testbed_version:
46 raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
47 (testbed_id, testbed_version, self._metadata.testbed_version))
48 for factory in self._metadata.build_factories():
49 self._factories[factory.factory_id] = factory
50 self._attributes = self._metadata.testbed_attributes()
51 self._root_directory = None
54 def root_directory(self):
55 return self._root_directory
59 return self._create.keys()
65 def defer_configure(self, name, value):
66 self._validate_testbed_attribute(name)
67 self._validate_testbed_value(name, value)
68 self._attributes.set_attribute_value(name, value)
69 self._configure[name] = value
71 def defer_create(self, guid, factory_id):
72 self._validate_factory_id(factory_id)
73 self._validate_not_guid(guid)
74 self._create[guid] = factory_id
76 def defer_create_set(self, guid, name, value):
77 self._validate_guid(guid)
78 self._validate_box_attribute(guid, name)
79 self._validate_box_value(guid, name, value)
80 if guid not in self._create_set:
81 self._create_set[guid] = dict()
82 self._create_set[guid][name] = value
84 def defer_factory_set(self, guid, name, value):
85 self._validate_guid(guid)
86 self._validate_factory_attribute(guid, name)
87 self._validate_factory_value(guid, name, value)
88 if guid not in self._factory_set:
89 self._factory_set[guid] = dict()
90 self._factory_set[guid][name] = value
92 def defer_connect(self, guid1, connector_type_name1, guid2,
93 connector_type_name2):
94 self._validate_guid(guid1)
95 self._validate_guid(guid2)
96 factory1 = self._get_factory(guid1)
97 factory_id2 = self._create[guid2]
98 connector_type = factory1.connector_type(connector_type_name1)
99 connector_type.can_connect(self._testbed_id, factory_id2,
100 connector_type_name2, False)
101 self._validate_connection(guid1, connector_type_name1, guid2,
102 connector_type_name2)
104 if not guid1 in self._connect:
105 self._connect[guid1] = dict()
106 if not connector_type_name1 in self._connect[guid1]:
107 self._connect[guid1][connector_type_name1] = dict()
108 self._connect[guid1][connector_type_name1][guid2] = \
110 if not guid2 in self._connect:
111 self._connect[guid2] = dict()
112 if not connector_type_name2 in self._connect[guid2]:
113 self._connect[guid2][connector_type_name2] = dict()
114 self._connect[guid2][connector_type_name2][guid1] = \
117 def defer_cross_connect(self, guid, connector_type_name, cross_guid,
118 cross_testbed_guid, cross_testbed_id, cross_factory_id,
119 cross_connector_type_name):
120 self._validate_guid(guid)
121 factory = self._get_factory(guid)
122 connector_type = factory.connector_type(connector_type_name)
123 connector_type.can_connect(cross_testbed_id, cross_factory_id,
124 cross_connector_type_name, True)
125 self._validate_connection(guid, connector_type_name, cross_guid,
126 cross_connector_type_name)
128 if not guid in self._cross_connect:
129 self._cross_connect[guid] = dict()
130 if not connector_type_name in self._cross_connect[guid]:
131 self._cross_connect[guid][connector_type_name] = dict()
132 self._cross_connect[guid][connector_type_name] = \
133 (cross_guid, cross_testbed_guid, cross_testbed_id,
134 cross_factory_id, cross_connector_type_name)
136 def defer_add_trace(self, guid, trace_name):
137 self._validate_guid(guid)
138 self._validate_trace(guid, trace_name)
139 if not guid in self._add_trace:
140 self._add_trace[guid] = list()
141 self._add_trace[guid].append(trace_name)
143 def defer_add_address(self, guid, address, netprefix, broadcast):
144 self._validate_guid(guid)
145 self._validate_allow_addresses(guid)
146 if guid not in self._add_address:
147 self._add_address[guid] = list()
148 self._add_address[guid].append((address, netprefix, broadcast))
150 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
151 self._validate_guid(guid)
152 self._validate_allow_routes(guid)
153 if not guid in self._add_route:
154 self._add_route[guid] = list()
155 self._add_route[guid].append((destination, netprefix, nexthop, metric))
158 self._root_directory = self._attributes.\
159 get_attribute_value("rootDirectory")
160 self._status = TS.STATUS_SETUP
163 def set_params(self, guid):
164 parameters = self._get_parameters(guid)
165 for name, value in parameters.iteritems():
166 self.set(guid, name, value)
168 self._do_in_factory_order(
170 self._metadata.create_order,
171 postaction = set_params )
172 self._status = TS.STATUS_CREATED
174 def _do_connect(self, init = True):
175 unconnected = copy.deepcopy(self._connect)
178 for guid1, connections in unconnected.items():
179 factory1 = self._get_factory(guid1)
180 for connector_type_name1, connections2 in connections.items():
181 connector_type1 = factory1.connector_type(connector_type_name1)
182 for guid2, connector_type_name2 in connections2.items():
183 factory_id2 = self._create[guid2]
184 # Connections are executed in a "From -> To" direction only
185 # This explicitly ignores the "To -> From" (mirror)
186 # connections of every connection pair.
188 connect_code = connector_type1.connect_to_init_code(
189 self._testbed_id, factory_id2,
190 connector_type_name2,
193 connect_code = connector_type1.connect_to_compl_code(
194 self._testbed_id, factory_id2,
195 connector_type_name2,
199 delay = connect_code(self, guid1, guid2)
201 if delay is not CONNECTION_DELAY:
202 del unconnected[guid1][connector_type_name1][guid2]
203 if not unconnected[guid1][connector_type_name1]:
204 del unconnected[guid1][connector_type_name1]
205 if not unconnected[guid1]:
206 del unconnected[guid1]
208 def do_connect_init(self):
211 def do_connect_compl(self):
212 self._do_connect(init = False)
213 self._status = TS.STATUS_CONNECTED
215 def _do_in_factory_order(self, action, order, postaction = None, poststep = None):
216 guids = collections.defaultdict(list)
217 # order guids (elements) according to factory_id
218 for guid, factory_id in self._create.iteritems():
219 guids[factory_id].append(guid)
221 # configure elements following the factory_id order
222 for factory_id in order:
223 # Create a parallel runner if we're given a Parallel() wrapper
225 if isinstance(factory_id, Parallel):
226 runner = ParallelRun(factory_id.maxthreads)
227 factory_id = factory_id.factory
229 # omit the factories that have no element to create
230 if factory_id not in guids:
234 factory = self._factories[factory_id]
235 if not getattr(factory, action):
237 def perform_action(guid):
238 getattr(factory, action)(self, guid)
240 postaction(self, guid)
242 # perform the action on all elements, in parallel if so requested
245 for guid in guids[factory_id]:
247 runner.put(perform_action, guid)
255 for guid in guids[factory_id]:
259 def do_poststep_preconfigure(self, guid):
260 # dummy hook for implementations interested in
261 # two-phase configuration
264 def do_preconfigure(self):
265 self._do_in_factory_order(
266 'preconfigure_function',
267 self._metadata.preconfigure_order,
268 poststep = self.do_poststep_preconfigure )
271 def do_poststep_configure(self, guid):
272 # dummy hook for implementations interested in
273 # two-phase configuration
276 def do_configure(self):
277 self._do_in_factory_order(
278 'configure_function',
279 self._metadata.configure_order,
280 poststep = self.do_poststep_configure )
281 self._status = TS.STATUS_CONFIGURED
283 def do_prestart(self):
284 self._do_in_factory_order(
286 self._metadata.prestart_order )
288 def _do_cross_connect(self, cross_data, init = True):
289 for guid, cross_connections in self._cross_connect.iteritems():
290 factory = self._get_factory(guid)
291 for connector_type_name, cross_connection in \
292 cross_connections.iteritems():
293 connector_type = factory.connector_type(connector_type_name)
294 (cross_guid, cross_testbed_guid, cross_testbed_id,
295 cross_factory_id, cross_connector_type_name) = cross_connection
297 connect_code = connector_type.connect_to_init_code(
298 cross_testbed_id, cross_factory_id,
299 cross_connector_type_name,
302 connect_code = connector_type.connect_to_compl_code(
303 cross_testbed_id, cross_factory_id,
304 cross_connector_type_name,
307 elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
308 connect_code(self, guid, elem_cross_data)
310 def do_cross_connect_init(self, cross_data):
311 self._do_cross_connect(cross_data)
313 def do_cross_connect_compl(self, cross_data):
314 self._do_cross_connect(cross_data, init = False)
315 self._status = TS.STATUS_CROSS_CONNECTED
317 def set(self, guid, name, value, time = TIME_NOW):
318 self._validate_guid(guid)
319 self._validate_box_attribute(guid, name)
320 self._validate_box_value(guid, name, value)
321 self._validate_modify_box_value(guid, name)
322 if guid not in self._set:
323 self._set[guid] = dict()
324 self._setlog[guid] = dict()
325 if time not in self._setlog[guid]:
326 self._setlog[guid][time] = dict()
327 self._setlog[guid][time][name] = value
328 self._set[guid][name] = value
330 def get(self, guid, name, time = TIME_NOW):
332 gets an attribute from box definitions if available.
333 Throws KeyError if the GUID wasn't created
334 through the defer_create interface, and AttributeError if the
335 attribute isn't available (doesn't exist or is design-only)
337 self._validate_guid(guid)
338 self._validate_box_attribute(guid, name)
339 if guid in self._set and name in self._set[guid]:
340 return self._set[guid][name]
341 if guid in self._create_set and name in self._create_set[guid]:
342 return self._create_set[guid][name]
343 # if nothing else found, returns the factory default value
344 factory = self._get_factory(guid)
345 return factory.box_attributes.get_attribute_value(name)
347 def get_route(self, guid, index, attribute):
349 returns information given to defer_add_route.
351 Raises AttributeError if an invalid attribute is requested
352 or if the indexed routing rule does not exist.
354 Raises KeyError if the GUID has not been seen by
357 ATTRIBUTES = ['Destination', 'NetPrefix', 'NextHop']
359 if attribute not in ATTRIBUTES:
360 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
362 attribute_index = ATTRIBUTES.index(attribute)
364 routes = self._add_route.get(guid)
366 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
369 if not (0 <= index < len(addresses)):
370 raise AttributeError, "GUID %r at %s does not have a routing entry #%s" % (
371 guid, self._testbed_id, index)
373 return routes[index][attribute_index]
375 def get_address(self, guid, index, attribute='Address'):
377 returns information given to defer_add_address
379 Raises AttributeError if an invalid attribute is requested
380 or if the indexed routing rule does not exist.
382 Raises KeyError if the GUID has not been seen by
385 ATTRIBUTES = ['Address', 'NetPrefix', 'Broadcast']
387 if attribute not in ATTRIBUTES:
388 raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
390 attribute_index = ATTRIBUTES.index(attribute)
392 addresses = self._add_address.get(guid)
394 raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
397 if not (0 <= index < len(addresses)):
398 raise AttributeError, "GUID %r at %s does not have an address #%s" % (
399 guid, self._testbed_id, index)
401 return addresses[index][attribute_index]
403 def get_attribute_list(self, guid, filter_flags = None, exclude = False):
404 factory = self._get_factory(guid)
405 attribute_list = list()
406 return factory.box_attributes.get_attribute_list(filter_flags, exclude)
408 def get_factory_id(self, guid):
409 factory = self._get_factory(guid)
410 return factory.factory_id
412 def start(self, time = TIME_NOW):
413 self._do_in_factory_order(
415 self._metadata.start_order )
416 self._status = TS.STATUS_STARTED
418 #action: NotImplementedError
420 def stop(self, time = TIME_NOW):
421 self._do_in_factory_order(
423 reversed(self._metadata.start_order) )
424 self._status = TS.STATUS_STOPPED
426 def status(self, guid = None):
429 self._validate_guid(guid)
430 factory = self._get_factory(guid)
431 status_function = factory.status_function
433 return status_function(self, guid)
434 return AS.STATUS_UNDETERMINED
436 def trace(self, guid, trace_id, attribute='value'):
437 if attribute == 'value':
438 fd = open("%s" % self.trace_filepath(guid, trace_id), "r")
441 elif attribute == 'path':
442 content = self.trace_filepath(guid, trace_id)
447 def traces_info(self):
449 host = self._attributes.get_attribute_value("deployment_host")
450 user = self._attributes.get_attribute_value("deployment_user")
451 for guid, trace_list in self._add_trace.iteritems():
452 traces_info[guid] = dict()
453 for trace_id in trace_list:
454 traces_info[guid][trace_id] = dict()
455 filepath = self.trace(guid, trace_id, attribute = "path")
456 traces_info[guid][trace_id]["host"] = host
457 traces_info[guid][trace_id]["user"] = user
458 traces_info[guid][trace_id]["filepath"] = filepath
461 def trace_filepath(self, guid, trace_id):
463 Return a trace's file path, for TestbedController's default
464 implementation of trace()
466 raise NotImplementedError
468 #shutdown: NotImplementedError
470 def get_connected(self, guid, connector_type_name,
471 other_connector_type_name):
472 """searchs the connected elements for the specific connector_type_name
474 if guid not in self._connect:
476 # all connections for all connectors for guid
477 all_connections = self._connect[guid]
478 if connector_type_name not in all_connections:
480 # all connections for the specific connector
481 connections = all_connections[connector_type_name]
482 specific_connections = [otr_guid for otr_guid, otr_connector_type_name \
483 in connections.iteritems() if \
484 otr_connector_type_name == other_connector_type_name]
485 return specific_connections
487 def _get_connection_count(self, guid, connection_type_name):
490 if guid in self._connect and connection_type_name in \
492 count = len(self._connect[guid][connection_type_name])
493 if guid in self._cross_connect and connection_type_name in \
494 self._cross_connect[guid]:
495 cross_count = len(self._cross_connect[guid][connection_type_name])
496 return count + cross_count
498 def _get_traces(self, guid):
499 return [] if guid not in self._add_trace else self._add_trace[guid]
501 def _get_parameters(self, guid):
502 return dict() if guid not in self._create_set else \
503 self._create_set[guid]
505 def _get_factory(self, guid):
506 factory_id = self._create[guid]
507 return self._factories[factory_id]
509 def _get_factory_id(self, guid):
510 """ Returns the factory ID of the (perhaps not yet) created object """
511 return self._create.get(guid, None)
513 def _validate_guid(self, guid):
514 if not guid in self._create:
515 raise RuntimeError("Element guid %d doesn't exist" % guid)
517 def _validate_not_guid(self, guid):
518 if guid in self._create:
519 raise AttributeError("Cannot add elements with the same guid: %d" %
522 def _validate_factory_id(self, factory_id):
523 if factory_id not in self._factories:
524 raise AttributeError("Invalid element type %s for testbed version %s" %
525 (factory_id, self._testbed_version))
527 def _validate_testbed_attribute(self, name):
528 if not self._attributes.has_attribute(name):
529 raise AttributeError("Invalid testbed attribute %s for testbed" % \
532 def _validate_testbed_value(self, name, value):
533 if not self._attributes.is_attribute_value_valid(name, value):
534 raise AttributeError("Invalid value %s for testbed attribute %s" % \
537 def _validate_box_attribute(self, guid, name):
538 factory = self._get_factory(guid)
539 if not factory.box_attributes.has_attribute(name):
540 raise AttributeError("Invalid attribute %s for element type %s" %
541 (name, factory.factory_id))
543 def _validate_box_value(self, guid, name, value):
544 factory = self._get_factory(guid)
545 if not factory.box_attributes.is_attribute_value_valid(name, value):
546 raise AttributeError("Invalid value %s for attribute %s" % \
549 def _validate_factory_attribute(self, guid, name):
550 factory = self._get_factory(guid)
551 if not factory.has_attribute(name):
552 raise AttributeError("Invalid attribute %s for element type %s" %
553 (name, factory.factory_id))
555 def _validate_factory_value(self, guid, name, value):
556 factory = self._get_factory(guid)
557 if not factory.is_attribute_value_valid(name, value):
558 raise AttributeError("Invalid value %s for attribute %s" % \
561 def _validate_trace(self, guid, trace_name):
562 factory = self._get_factory(guid)
563 if not trace_name in factory.traces_list:
564 raise RuntimeError("Element type '%s' has no trace '%s'" %
565 (factory.factory_id, trace_name))
567 def _validate_allow_addresses(self, guid):
568 factory = self._get_factory(guid)
569 if not factory.allow_addresses:
570 raise RuntimeError("Element type '%s' doesn't support addresses" %
572 attr_name = "maxAddresses"
573 if guid in self._create_set and attr_name in self._create_set[guid]:
574 max_addresses = self._create_set[guid][attr_name]
576 factory = self._get_factory(guid)
577 max_addresses = factory.box_attributes.get_attribute_value(attr_name)
578 if guid in self._add_address:
579 count_addresses = len(self._add_address[guid])
580 if max_addresses == count_addresses:
581 raise RuntimeError("Element guid %d of type '%s' can't accept \
582 more addresses" % (guid, factory.factory_id))
584 def _validate_allow_routes(self, guid):
585 factory = self._get_factory(guid)
586 if not factory.allow_routes:
587 raise RuntimeError("Element type '%s' doesn't support routes" %
590 def _validate_connection(self, guid1, connector_type_name1, guid2,
591 connector_type_name2, cross = False):
592 # can't connect with self
594 raise AttributeError("Can't connect guid %d to self" % \
596 # the connection is already done, so ignore
597 connected = self.get_connected(guid1, connector_type_name1,
598 connector_type_name2)
599 if guid2 in connected:
601 count1 = self._get_connection_count(guid1, connector_type_name1)
602 factory1 = self._get_factory(guid1)
603 connector_type1 = factory1.connector_type(connector_type_name1)
604 if count1 == connector_type1.max:
605 raise AttributeError("Connector %s is full for guid %d" % \
606 (connector_type_name1, guid1))
608 def _validate_modify_box_value(self, guid, name):
609 factory = self._get_factory(guid)
610 if self._status > TS.STATUS_STARTED and \
611 (factory.box_attributes.is_attribute_exec_read_only(name) or \
612 factory.box_attributes.is_attribute_exec_immutable(name)):
613 raise AttributeError("Attribute %s can only be modified during experiment design" % name)