-#!/usr/bin/env python
# -*- coding: utf-8 -*-
from nepi.core import execute
-from nepi.core.metadata import Metadata
+from nepi.core.metadata import Metadata, Parallel
from nepi.util import validation
from nepi.util.constants import TIME_NOW, \
ApplicationStatus as AS, \
TestbedStatus as TS, \
CONNECTION_DELAY
+from nepi.util.parallel import ParallelRun
import collections
import copy
+import logging
class TestbedController(execute.TestbedController):
def __init__(self, testbed_id, testbed_version):
# testbed element instances
self._elements = dict()
- self._metadata = Metadata(self._testbed_id, self._testbed_version)
+ self._metadata = Metadata(self._testbed_id)
+ if self._metadata.testbed_version != testbed_version:
+ raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
+ (testbed_id, testbed_version, self._metadata.testbed_version))
for factory in self._metadata.build_factories():
self._factories[factory.factory_id] = factory
self._attributes = self._metadata.testbed_attributes()
self._root_directory = None
+
+ # Logging
+ self._logger = logging.getLogger("nepi.core.testbed_impl")
@property
def root_directory(self):
def elements(self):
return self._elements
- def _get_factory_id(self, guid):
- """ Returns the factory ID of the (perhaps not yet) created object """
- return self._create.get(guid, None)
-
def defer_configure(self, name, value):
- if not self._attributes.has_attribute(name):
- raise AttributeError("Invalid attribute %s for testbed" % name)
- # Validation
+ self._validate_testbed_attribute(name)
+ self._validate_testbed_value(name, value)
self._attributes.set_attribute_value(name, value)
self._configure[name] = value
def defer_create(self, guid, factory_id):
- if factory_id not in self._factories:
- raise AttributeError("Invalid element type %s for testbed version %s" %
- (factory_id, self._testbed_version))
- if guid in self._create:
- raise AttributeError("Cannot add elements with the same guid: %d" %
- guid)
+ self._validate_factory_id(factory_id)
+ self._validate_not_guid(guid)
self._create[guid] = factory_id
def defer_create_set(self, guid, name, value):
- if not guid in self._create:
- raise RuntimeError("Element guid %d doesn't exist" % guid)
- factory = self._get_factory(guid)
- if not factory.box_attributes.has_attribute(name):
- raise AttributeError("Invalid attribute %s for element type %s" %
- (name, factory.factory_id))
- if not factory.box_attributes.is_attribute_value_valid(name, value):
- raise AttributeError("Invalid value %s for attribute %s" % \
- (value, name))
+ self._validate_guid(guid)
+ self._validate_box_attribute(guid, name)
+ self._validate_box_value(guid, name, value)
if guid not in self._create_set:
self._create_set[guid] = dict()
self._create_set[guid][name] = value
def defer_factory_set(self, guid, name, value):
- if not guid in self._create:
- raise RuntimeError("Element guid %d doesn't exist" % guid)
- factory = self._get_factory(guid)
- if not factory.has_attribute(name):
- raise AttributeError("Invalid attribute %s for element type %s" %
- (name, factory.factory_id))
- if not factory.is_attribute_value_valid(name, value):
- raise AttributeError("Invalid value %s for attribute %s" % \
- (value, name))
+ self._validate_guid(guid)
+ self._validate_factory_attribute(guid, name)
+ self._validate_factory_value(guid, name, value)
if guid not in self._factory_set:
self._factory_set[guid] = dict()
self._factory_set[guid][name] = value
def defer_connect(self, guid1, connector_type_name1, guid2,
connector_type_name2):
+ self._validate_guid(guid1)
+ self._validate_guid(guid2)
factory1 = self._get_factory(guid1)
factory_id2 = self._create[guid2]
- # TODO VALIDATE!!!
- #if self.box.guid == connector.box.guid:
- # return False
- #if self.is_full() or connector.is_full():
- # return False
- #if self.is_connected(connector):
- # return False
- #count = self._get_connection_count(guid1, connector_type_name1)
connector_type = factory1.connector_type(connector_type_name1)
connector_type.can_connect(self._testbed_id, factory_id2,
connector_type_name2, False)
+ self._validate_connection(guid1, connector_type_name1, guid2,
+ connector_type_name2)
+
if not guid1 in self._connect:
self._connect[guid1] = dict()
if not connector_type_name1 in self._connect[guid1]:
if not connector_type_name2 in self._connect[guid2]:
self._connect[guid2][connector_type_name2] = dict()
self._connect[guid2][connector_type_name2][guid1] = \
- connector_type_name1
+ connector_type_name1
def defer_cross_connect(self, guid, connector_type_name, cross_guid,
cross_testbed_guid, cross_testbed_id, cross_factory_id,
cross_connector_type_name):
+ self._validate_guid(guid)
factory = self._get_factory(guid)
- # TODO VALIDATE!!!
- #if self.box.guid == connector.box.guid:
- # return False
- #if self.is_full() or connector.is_full():
- # return False
- #if self.is_connected(connector):
- # return False
- #count = self._get_connection_count(guid, connector_type_name)
connector_type = factory.connector_type(connector_type_name)
connector_type.can_connect(cross_testbed_id, cross_factory_id,
cross_connector_type_name, True)
+ self._validate_connection(guid, connector_type_name, cross_guid,
+ cross_connector_type_name)
+
if not guid in self._cross_connect:
self._cross_connect[guid] = dict()
if not connector_type_name in self._cross_connect[guid]:
cross_factory_id, cross_connector_type_name)
def defer_add_trace(self, guid, trace_name):
- if not guid in self._create:
- raise RuntimeError("Element guid %d doesn't exist" % guid)
- factory = self._get_factory(guid)
- if not trace_name in factory.traces_list:
- raise RuntimeError("Element type '%s' has no trace '%s'" %
- (factory.factory_id, trace_name))
+ self._validate_guid(guid)
+ self._validate_trace(guid, trace_name)
if not guid in self._add_trace:
self._add_trace[guid] = list()
self._add_trace[guid].append(trace_name)
def defer_add_address(self, guid, address, netprefix, broadcast):
- if not guid in self._create:
- raise RuntimeError("Element guid %d doesn't exist" % guid)
- factory = self._get_factory(guid)
- if not factory.allow_addresses:
- raise RuntimeError("Element type '%s' doesn't support addresses" %
- factory.factory_id)
- max_addresses = 1 # TODO: MAKE THIS PARAMETRIZABLE
- if guid in self._add_address:
- count_addresses = len(self._add_address[guid])
- if max_addresses == count_addresses:
- raise RuntimeError("Element guid %d of type '%s' can't accept \
- more addresses" % (guid, factory.factory_id))
- else:
+ self._validate_guid(guid)
+ self._validate_allow_addresses(guid)
+ if guid not in self._add_address:
self._add_address[guid] = list()
self._add_address[guid].append((address, netprefix, broadcast))
- def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
- if not guid in self._create:
- raise RuntimeError("Element guid %d doesn't exist" % guid)
- factory = self._get_factory(guid)
- if not factory.allow_routes:
- raise RuntimeError("Element type '%s' doesn't support routes" %
- factory.factory_id)
+ def defer_add_route(self, guid, destination, netprefix, nexthop,
+ metric = 0, device = None):
+ self._validate_guid(guid)
+ self._validate_allow_routes(guid)
if not guid in self._add_route:
self._add_route[guid] = list()
- self._add_route[guid].append((destination, netprefix, nexthop, metric))
+ self._add_route[guid].append((destination, netprefix, nexthop,
+ metric, device))
def do_setup(self):
self._root_directory = self._attributes.\
self._status = TS.STATUS_CONNECTED
def _do_in_factory_order(self, action, order, postaction = None, poststep = None):
+ logger = self._logger
+
guids = collections.defaultdict(list)
# order guids (elements) according to factory_id
for guid, factory_id in self._create.iteritems():
guids[factory_id].append(guid)
+
# configure elements following the factory_id order
for factory_id in order:
+ # Create a parallel runner if we're given a Parallel() wrapper
+ runner = None
+ if isinstance(factory_id, Parallel):
+ runner = ParallelRun(factory_id.maxthreads)
+ factory_id = factory_id.factory
+
# omit the factories that have no element to create
if factory_id not in guids:
continue
+
+ # configure action
factory = self._factories[factory_id]
- if not getattr(factory, action):
+ if isinstance(action, basestring) and not getattr(factory, action):
continue
- for guid in guids[factory_id]:
- getattr(factory, action)(self, guid)
+ def perform_action(guid):
+ if isinstance(action, basestring):
+ getattr(factory, action)(self, guid)
+ else:
+ action(self, guid)
if postaction:
postaction(self, guid)
+
+ # perform the action on all elements, in parallel if so requested
+ if runner:
+ logger.debug("TestbedController: Starting parallel %s", action)
+ runner.start()
+
+ for guid in guids[factory_id]:
+ if runner:
+ logger.debug("TestbedController: Scheduling %s on %s", action, guid)
+ runner.put(perform_action, guid)
+ else:
+ logger.debug("TestbedController: Performing %s on %s", action, guid)
+ perform_action(guid)
+
+ # sync
+ if runner:
+ runner.sync()
+
+ # post hook
if poststep:
for guid in guids[factory_id]:
- poststep(self, guid)
+ if runner:
+ logger.debug("TestbedController: Scheduling post-%s on %s", action, guid)
+ runner.put(poststep, self, guid)
+ else:
+ logger.debug("TestbedController: Performing post-%s on %s", action, guid)
+ poststep(self, guid)
+
+ # sync
+ if runner:
+ runner.join()
+ logger.debug("TestbedController: Finished parallel %s", action)
@staticmethod
def do_poststep_preconfigure(self, guid):
cross_connector_type_name,
True)
if connect_code:
+ if hasattr(connect_code, "func"):
+ func_name = connect_code.func.__name__
+ elif hasattr(connect_code, "__name__"):
+ func_name = connect_code.__name__
+ else:
+ func_name = repr(connect_code)
+ self._logger.debug("Cross-connect - guid: %d, connect_code: %s " % (
+ guid, func_name))
elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
connect_code(self, guid, elem_cross_data)
self._status = TS.STATUS_CROSS_CONNECTED
def set(self, guid, name, value, time = TIME_NOW):
- if not guid in self._create:
- raise RuntimeError("Element guid %d doesn't exist" % guid)
- factory = self._get_factory(guid)
- if not factory.box_attributes.has_attribute(name):
- raise AttributeError("Invalid attribute %s for element type %s" %
- (name, factory.factory_id))
- if self._status > TS.STATUS_STARTED and \
- factory.box_attributes.is_attribute_design_only(name):
- raise AttributeError("Attribute %s can only be modified during experiment design" % name)
- if not factory.box_attributes.is_attribute_value_valid(name, value):
- raise AttributeError("Invalid value %s for attribute %s" % \
- (value, name))
+ self._validate_guid(guid)
+ self._validate_box_attribute(guid, name)
+ self._validate_box_value(guid, name, value)
+ self._validate_modify_box_value(guid, name)
if guid not in self._set:
self._set[guid] = dict()
self._setlog[guid] = dict()
through the defer_create interface, and AttributeError if the
attribute isn't available (doesn't exist or is design-only)
"""
- if not guid in self._create:
- raise KeyError, "Element guid %d doesn't exist" % guid
- factory = self._get_factory(guid)
- if not factory.box_attributes.has_attribute(name):
- raise AttributeError, "Invalid attribute %s for element type %s" % \
- (name, factory.factory_id)
+ self._validate_guid(guid)
+ self._validate_box_attribute(guid, name)
if guid in self._set and name in self._set[guid]:
return self._set[guid][name]
if guid in self._create_set and name in self._create_set[guid]:
return self._create_set[guid][name]
+ # if nothing else found, returns the factory default value
+ factory = self._get_factory(guid)
return factory.box_attributes.get_attribute_value(name)
def get_route(self, guid, index, attribute):
return addresses[index][attribute_index]
- def get_attribute_list(self, guid):
+ def get_attribute_list(self, guid, filter_flags = None, exclude = False):
factory = self._get_factory(guid)
attribute_list = list()
- return factory.box_attributes.attributes_list
+ return factory.box_attributes.get_attribute_list(filter_flags, exclude)
def get_factory_id(self, guid):
factory = self._get_factory(guid)
def status(self, guid = None):
if not guid:
return self._status
- if not guid in self._create:
- raise RuntimeError("Element guid %d doesn't exist" % guid)
+ self._validate_guid(guid)
factory = self._get_factory(guid)
status_function = factory.status_function
if status_function:
return status_function(self, guid)
return AS.STATUS_UNDETERMINED
+
+ def testbed_status(self):
+ return self._status
def trace(self, guid, trace_id, attribute='value'):
if attribute == 'value':
fd.close()
elif attribute == 'path':
content = self.trace_filepath(guid, trace_id)
+ elif attribute == 'filename':
+ content = self.trace_filename(guid, trace_id)
else:
content = None
return content
"""
raise NotImplementedError
+ def trace_filename(self, guid, trace_id):
+ """
+ Return a trace's file name, for TestbedController's default
+ implementation of trace()
+ """
+ raise NotImplementedError
+
#shutdown: NotImplementedError
def get_connected(self, guid, connector_type_name,
factory_id = self._create[guid]
return self._factories[factory_id]
+ def _get_factory_id(self, guid):
+ """ Returns the factory ID of the (perhaps not yet) created object """
+ return self._create.get(guid, None)
+
+ def _validate_guid(self, guid):
+ if not guid in self._create:
+ raise RuntimeError("Element guid %d doesn't exist" % guid)
+
+ def _validate_not_guid(self, guid):
+ if guid in self._create:
+ raise AttributeError("Cannot add elements with the same guid: %d" %
+ guid)
+
+ def _validate_factory_id(self, factory_id):
+ if factory_id not in self._factories:
+ raise AttributeError("Invalid element type %s for testbed version %s" %
+ (factory_id, self._testbed_version))
+
+ def _validate_testbed_attribute(self, name):
+ if not self._attributes.has_attribute(name):
+ raise AttributeError("Invalid testbed attribute %s for testbed" % \
+ name)
+
+ def _validate_testbed_value(self, name, value):
+ if not self._attributes.is_attribute_value_valid(name, value):
+ raise AttributeError("Invalid value %r for testbed attribute %s" % \
+ (value, name))
+
+ def _validate_box_attribute(self, guid, name):
+ factory = self._get_factory(guid)
+ if not factory.box_attributes.has_attribute(name):
+ raise AttributeError("Invalid attribute %s for element type %s" %
+ (name, factory.factory_id))
+
+ def _validate_box_value(self, guid, name, value):
+ factory = self._get_factory(guid)
+ if not factory.box_attributes.is_attribute_value_valid(name, value):
+ raise AttributeError("Invalid value %r for attribute %s" % \
+ (value, name))
+
+ def _validate_factory_attribute(self, guid, name):
+ factory = self._get_factory(guid)
+ if not factory.has_attribute(name):
+ raise AttributeError("Invalid attribute %s for element type %s" %
+ (name, factory.factory_id))
+
+ def _validate_factory_value(self, guid, name, value):
+ factory = self._get_factory(guid)
+ if not factory.is_attribute_value_valid(name, value):
+ raise AttributeError("Invalid value %r for attribute %s" % \
+ (value, name))
+
+ def _validate_trace(self, guid, trace_name):
+ factory = self._get_factory(guid)
+ if not trace_name in factory.traces_list:
+ raise RuntimeError("Element type '%s' has no trace '%s'" %
+ (factory.factory_id, trace_name))
+
+ def _validate_allow_addresses(self, guid):
+ factory = self._get_factory(guid)
+ if not factory.allow_addresses:
+ raise RuntimeError("Element type '%s' doesn't support addresses" %
+ factory.factory_id)
+ attr_name = "maxAddresses"
+ if guid in self._create_set and attr_name in self._create_set[guid]:
+ max_addresses = self._create_set[guid][attr_name]
+ else:
+ factory = self._get_factory(guid)
+ max_addresses = factory.box_attributes.get_attribute_value(attr_name)
+ if guid in self._add_address:
+ count_addresses = len(self._add_address[guid])
+ if max_addresses == count_addresses:
+ raise RuntimeError("Element guid %d of type '%s' can't accept \
+ more addresses" % (guid, factory.factory_id))
+
+ def _validate_allow_routes(self, guid):
+ factory = self._get_factory(guid)
+ if not factory.allow_routes:
+ raise RuntimeError("Element type '%s' doesn't support routes" %
+ factory.factory_id)
+
+ def _validate_connection(self, guid1, connector_type_name1, guid2,
+ connector_type_name2, cross = False):
+ # can't connect with self
+ if guid1 == guid2:
+ raise AttributeError("Can't connect guid %d to self" % \
+ (guid1))
+ # the connection is already done, so ignore
+ connected = self.get_connected(guid1, connector_type_name1,
+ connector_type_name2)
+ if guid2 in connected:
+ return
+ count1 = self._get_connection_count(guid1, connector_type_name1)
+ factory1 = self._get_factory(guid1)
+ connector_type1 = factory1.connector_type(connector_type_name1)
+ if count1 == connector_type1.max:
+ raise AttributeError("Connector %s is full for guid %d" % \
+ (connector_type_name1, guid1))
+
+ def _validate_modify_box_value(self, guid, name):
+ factory = self._get_factory(guid)
+ if self._status > TS.STATUS_STARTED and \
+ (factory.box_attributes.is_attribute_exec_read_only(name) or \
+ factory.box_attributes.is_attribute_exec_immutable(name)):
+ raise AttributeError("Attribute %s can only be modified during experiment design" % name)
+