--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from nepi.core.attributes import AttributesMap, Attribute
+
+class AddressableMixin(object):
+ def __init__(self, guid, factory, testbed_guid, container = None):
+ super(AddressableMixin, self).__init__(guid, factory, testbed_guid,
+ container)
+ max_addr = self._factory_attributes["MaxAddresses"] \
+ if "MaxAddresses" in self._factory_attributes else 1
+ self._max_addresses = max_addr
+ self._addresses = list()
+
+ @property
+ def addresses(self):
+ return self._addresses
+
+ @property
+ def max_addresses(self):
+ return self._max_addresses
+
+class UserAddressableMixin(AddressableMixin):
+ def __init__(self, guid, factory, testbed_guid, container = None):
+ super(UserAddressableMixin, self).__init__(guid, factory, testbed_guid,
+ container)
+
+ def add_address(self):
+ if len(self._addresses) == self.max_addresses:
+ raise RuntimeError("Maximun number of addresses for this box reached.")
+ from nepi.core.design import Address
+ address = Address()
+ self._addresses.append(address)
+ return address
+
+ def delete_address(self, address):
+ self._addresses.remove(address)
+ del address
+
+ def destroy(self):
+ super(UserAddressableMixin, self).destroy()
+ for address in list(self.addresses):
+ self.delete_address(address)
+ self._addresses = None
+
+class RoutableMixin(object):
+ def __init__(self, guid, factory, testbed_guid, container = None):
+ super(RoutableMixin, self).__init__(guid, factory, testbed_guid,
+ container)
+ self._routes = list()
+
+ @property
+ def routes(self):
+ return self._routes
+
+class UserRoutableMixin(RoutableMixin):
+ def __init__(self, guid, factory, testbed_guid, container = None):
+ super(UserRoutableMixin, self).__init__(guid, factory, testbed_guid,
+ container)
+
+ def add_route(self):
+ from nepi.core.design import Route
+ route = Route()
+ self._routes.append(route)
+ return route
+
+ def delete_route(self, route):
+ self._routes.remove(route)
+ del route
+
+ def destroy(self):
+ super(UserRoutableMixin, self).destroy()
+ for route in list(self.routes):
+ self.delete_route(route)
+ self._route = None
+
+def MixIn(MyClass, MixIn):
+ # Mixins are installed BEFORE "Box" because
+ # Box inherits from non-cooperative classes,
+ # so the MRO chain gets broken when it gets
+ # to Box.
+
+ # Install mixin
+ MyClass.__bases__ = (MixIn,) + MyClass.__bases__
+
+ # Add properties
+ # Somehow it doesn't work automatically
+ for name in dir(MixIn):
+ prop = getattr(MixIn,name,None)
+ if isinstance(prop, property):
+ setattr(MyClass, name, prop)
+
+ # Update name
+ MyClass.__name__ = MyClass.__name__.replace(
+ 'Box',
+ MixIn.__name__.replace('MixIn','')+'Box',
+ 1)
+
+class Factory(AttributesMap):
+ _box_class_cache = {}
+
+ def __init__(self, factory_id,
+ create_function,
+ start_function,
+ stop_function,
+ status_function,
+ configure_function,
+ preconfigure_function,
+ prestart_function,
+ help = None,
+ category = None,
+ allow_addresses = False,
+ has_addresses = False,
+ allow_routes = False,
+ has_routes = False):
+
+ super(Factory, self).__init__()
+
+ self._factory_id = factory_id
+ self._allow_addresses = bool(allow_addresses)
+ self._allow_routes = bool(allow_routes)
+ self._has_addresses = bool(has_addresses) or self._allow_addresses
+ self._has_routes = bool(has_routes) or self._allow_routes
+ self._create_function = create_function
+ self._start_function = start_function
+ self._stop_function = stop_function
+ self._status_function = status_function
+ self._configure_function = configure_function
+ self._preconfigure_function = preconfigure_function
+ self._prestart_function = prestart_function
+ self._help = help
+ self._category = category
+ self._connector_types = dict()
+ self._traces = dict()
+ self._tags = list()
+ self._box_attributes = AttributesMap()
+
+ from nepi.core.design import Box
+ if not self._has_addresses and not self._has_routes:
+ self._factory = Box
+ else:
+ addresses = 'w' if self._allow_addresses else ('r' if self._has_addresses else '-')
+ routes = 'w' if self._allow_routes else ('r' if self._has_routes else '-')
+ key = addresses+routes
+
+ if key in self._box_class_cache:
+ self._factory = self._box_class_cache[key]
+ else:
+ # Create base class
+ class _factory(Box):
+ def __init__(self, guid, factory, testbed_guid, container = None):
+ super(_factory, self).__init__(guid, factory, testbed_guid, container)
+
+ # Add mixins, one by one
+ if allow_addresses:
+ MixIn(_factory, UserAddressableMixin)
+ elif has_addresses:
+ MixIn(_factory, AddressableMixin)
+
+ if allow_routes:
+ MixIn(_factory, UserRoutableMixin)
+ elif has_routes:
+ MixIn(_factory, RoutableMixin)
+
+ # Put into cache
+ self._box_class_cache[key] = self._factory = _factory
+
+ @property
+ def factory_id(self):
+ return self._factory_id
+
+ @property
+ def allow_addresses(self):
+ return self._allow_addresses
+
+ @property
+ def allow_routes(self):
+ return self._allow_routes
+
+ @property
+ def has_addresses(self):
+ return self._has_addresses
+
+ @property
+ def has_routes(self):
+ return self._has_routes
+
+ @property
+ def help(self):
+ return self._help
+
+ @property
+ def category(self):
+ return self._category
+
+ @property
+ def connector_types(self):
+ return self._connector_types.values()
+
+ @property
+ def traces(self):
+ return self._traces.values()
+
+ @property
+ def traces_list(self):
+ return self._traces.keys()
+
+ @property
+ def tags(self):
+ return self._tags
+
+ @property
+ def box_attributes(self):
+ return self._box_attributes
+
+ @property
+ def create_function(self):
+ return self._create_function
+
+ @property
+ def prestart_function(self):
+ return self._prestart_function
+
+ @property
+ def start_function(self):
+ return self._start_function
+
+ @property
+ def stop_function(self):
+ return self._stop_function
+
+ @property
+ def status_function(self):
+ return self._status_function
+
+ @property
+ def configure_function(self):
+ return self._configure_function
+
+ @property
+ def preconfigure_function(self):
+ return self._preconfigure_function
+
+ def connector_type(self, name):
+ return self._connector_types[name]
+
+ def add_connector_type(self, connector_type):
+ self._connector_types[connector_type.name] = connector_type
+
+ def add_trace(self, name, help, enabled = False):
+ self._traces[name] = (name, help, enabled)
+
+ def add_tag(self, tag_id):
+ self._tags.append(tag_id)
+
+ def add_box_attribute(self, name, help, type, value = None, range = None,
+ allowed = None, flags = Attribute.NoFlags, validation_function = None,
+ category = None):
+ self._box_attributes.add_attribute(name, help, type, value, range,
+ allowed, flags, validation_function, category)
+
+ def create(self, guid, testbed_description):
+ return self._factory(guid, self, testbed_description.guid)
+
+ def destroy(self):
+ super(Factory, self).destroy()
+ self._connector_types = None
+
raise NotImplementedError
class Metadata(object):
- STANDARD_BOX_ATTRIBUTES = (
- ("label", dict(
- name = "label",
- validation_function = validation.is_string,
- type = Attribute.STRING,
- flags = Attribute.DesignOnly,
- help = "A unique identifier for referring to this box",
- )),
- )
-
- STANDARD_TESTBED_ATTRIBUTES = (
- ("home_directory", dict(
- name = "homeDirectory",
- validation_function = validation.is_string,
- help = "Path to the directory where traces and other files will be stored",
- type = Attribute.STRING,
- value = "",
- flags = Attribute.DesignOnly
- )),
- ("label", dict(
- name = "label",
- validation_function = validation.is_string,
- type = Attribute.STRING,
- flags = Attribute.DesignOnly,
- help = "A unique identifier for referring to this testbed",
- )),
- )
+ # These attributes should be added to all boxes
+ STANDARD_BOX_ATTRIBUTES = dict({
+ "label" : dict({
+ "name" : "label",
+ "validation_function" : validation.is_string,
+ "type" : Attribute.STRING,
+ "flags" : Attribute.DesignOnly,
+ "help" : "A unique identifier for referring to this box",
+ }),
+ })
+
+ # These attributes should be added to all testbeds
+ STANDARD_TESTBED_ATTRIBUTES = dict({
+ "home_directory" : dict({
+ "name" : "homeDirectory",
+ "validation_function" : validation.is_string,
+ "help" : "Path to the directory where traces and other files will be stored",
+ "type" : Attribute.STRING,
+ "value" : "",
+ "flags" : Attribute.DesignOnly
+ }),
+ "label" : dict({
+ "name" : "label",
+ "validation_function" : validation.is_string,
+ "type" : Attribute.STRING,
+ "flags" : Attribute.DesignOnly,
+ "help" : "A unique identifier for referring to this testbed",
+ }),
+ })
- DEPLOYMENT_ATTRIBUTES = (
+ # These attributes should be added to all testbeds
+ DEPLOYMENT_ATTRIBUTES = dict({
# TESTBED DEPLOYMENT ATTRIBUTES
- (DC.DEPLOYMENT_ENVIRONMENT_SETUP, dict(
- name = DC.DEPLOYMENT_ENVIRONMENT_SETUP,
- validation_function = validation.is_string,
- help = "Shell commands to run before spawning TestbedController processes",
- type = Attribute.STRING,
- flags = Attribute.DesignOnly,
- category = AC.CATEGORY_DEPLOYMENT,
- )),
- (DC.DEPLOYMENT_MODE, dict(name = DC.DEPLOYMENT_MODE,
- help = "Instance execution mode",
- type = Attribute.ENUM,
- value = DC.MODE_SINGLE_PROCESS,
- allowed = [
+ DC.DEPLOYMENT_ENVIRONMENT_SETUP : dict({
+ "name" : DC.DEPLOYMENT_ENVIRONMENT_SETUP,
+ "validation_function" : validation.is_string,
+ "help" : "Shell commands to run before spawning TestbedController processes",
+ "type" : Attribute.STRING,
+ "flags" : Attribute.DesignOnly,
+ "category" : AC.CATEGORY_DEPLOYMENT,
+ }),
+ DC.DEPLOYMENT_MODE: dict({
+ "name" : DC.DEPLOYMENT_MODE,
+ "help" : "Instance execution mode",
+ "type" : Attribute.ENUM,
+ "value" : DC.MODE_SINGLE_PROCESS,
+ "allowed" : [
DC.MODE_DAEMON,
DC.MODE_SINGLE_PROCESS
],
- flags = Attribute.DesignOnly,
- validation_function = validation.is_enum,
- category = AC.CATEGORY_DEPLOYMENT,
- )),
- (DC.DEPLOYMENT_COMMUNICATION, dict(name = DC.DEPLOYMENT_COMMUNICATION,
- help = "Instance communication mode",
- type = Attribute.ENUM,
- value = DC.ACCESS_LOCAL,
- allowed = [
+ "flags" : Attribute.DesignOnly,
+ "validation_function" : validation.is_enum,
+ "category" : AC.CATEGORY_DEPLOYMENT,
+ }),
+ DC.DEPLOYMENT_COMMUNICATION : dict({
+ "name" : DC.DEPLOYMENT_COMMUNICATION,
+ "help" : "Instance communication mode",
+ "type" : Attribute.ENUM,
+ "value" : DC.ACCESS_LOCAL,
+ "allowed" : [
DC.ACCESS_LOCAL,
DC.ACCESS_SSH
],
- flags = Attribute.DesignOnly,
- validation_function = validation.is_enum,
- category = AC.CATEGORY_DEPLOYMENT,
- )),
- (DC.DEPLOYMENT_HOST, dict(name = DC.DEPLOYMENT_HOST,
- help = "Host where the testbed will be executed",
- type = Attribute.STRING,
- value = "localhost",
- flags = Attribute.DesignOnly,
- validation_function = validation.is_string,
- category = AC.CATEGORY_DEPLOYMENT,
- )),
- (DC.DEPLOYMENT_USER, dict(name = DC.DEPLOYMENT_USER,
- help = "User on the Host to execute the testbed",
- type = Attribute.STRING,
- value = getpass.getuser(),
- flags = Attribute.DesignOnly,
- validation_function = validation.is_string,
- category = AC.CATEGORY_DEPLOYMENT,
- )),
- (DC.DEPLOYMENT_KEY, dict(name = DC.DEPLOYMENT_KEY,
- help = "Path to SSH key to use for connecting",
- type = Attribute.STRING,
- flags = Attribute.DesignOnly,
- validation_function = validation.is_string,
- category = AC.CATEGORY_DEPLOYMENT,
- )),
- (DC.DEPLOYMENT_PORT, dict(name = DC.DEPLOYMENT_PORT,
- help = "Port on the Host",
- type = Attribute.INTEGER,
- value = 22,
- flags = Attribute.DesignOnly,
- validation_function = validation.is_integer,
- category = AC.CATEGORY_DEPLOYMENT,
- )),
- (DC.ROOT_DIRECTORY, dict(name = DC.ROOT_DIRECTORY,
- help = "Root directory for storing process files",
- type = Attribute.STRING,
- value = ".",
- flags = Attribute.DesignOnly,
- validation_function = validation.is_string, # TODO: validation.is_path
- category = AC.CATEGORY_DEPLOYMENT,
- )),
- (DC.USE_AGENT, dict(name = DC.USE_AGENT,
- help = "Use -A option for forwarding of the authentication agent, if ssh access is used",
- type = Attribute.BOOL,
- value = False,
- flags = Attribute.DesignOnly,
- validation_function = validation.is_bool,
- category = AC.CATEGORY_DEPLOYMENT,
- )),
- (DC.LOG_LEVEL, dict(name = DC.LOG_LEVEL,
- help = "Log level for instance",
- type = Attribute.ENUM,
- value = DC.ERROR_LEVEL,
- allowed = [
+ "flags" : Attribute.DesignOnly,
+ "validation_function" : validation.is_enum,
+ "category" : AC.CATEGORY_DEPLOYMENT,
+ }),
+ DC.DEPLOYMENT_HOST : dict({
+ "name" : DC.DEPLOYMENT_HOST,
+ "help" : "Host where the testbed will be executed",
+ "type" : Attribute.STRING,
+ "value" : "localhost",
+ "flags" : Attribute.DesignOnly,
+ "validation_function" : validation.is_string,
+ "category" : AC.CATEGORY_DEPLOYMENT,
+ }),
+ DC.DEPLOYMENT_USER : dict({
+ "name" : DC.DEPLOYMENT_USER,
+ "help" : "User on the Host to execute the testbed",
+ "type" : Attribute.STRING,
+ "value" : getpass.getuser(),
+ "flags" : Attribute.DesignOnly,
+ "validation_function" : validation.is_string,
+ "category" : AC.CATEGORY_DEPLOYMENT,
+ }),
+ DC.DEPLOYMENT_KEY : dict({
+ "name" : DC.DEPLOYMENT_KEY,
+ "help" : "Path to SSH key to use for connecting",
+ "type" : Attribute.STRING,
+ "flags" : Attribute.DesignOnly,
+ "validation_function" : validation.is_string,
+ "category" : AC.CATEGORY_DEPLOYMENT,
+ }),
+ DC.DEPLOYMENT_PORT : dict({
+ "name" : DC.DEPLOYMENT_PORT,
+ "help" : "Port on the Host",
+ "type" : Attribute.INTEGER,
+ "value" : 22,
+ "flags" : Attribute.DesignOnly,
+ "validation_function" : validation.is_integer,
+ "category" : AC.CATEGORY_DEPLOYMENT,
+ }),
+ DC.ROOT_DIRECTORY : dict({
+ "name" : DC.ROOT_DIRECTORY,
+ "help" : "Root directory for storing process files",
+ "type" : Attribute.STRING,
+ "value" : ".",
+ "flags" : Attribute.DesignOnly,
+ "validation_function" : validation.is_string, # TODO: validation.is_path
+ "category" : AC.CATEGORY_DEPLOYMENT,
+ }),
+ DC.USE_AGENT : dict({
+ "name" : DC.USE_AGENT,
+ "help" : "Use -A option for forwarding of the authentication agent, if ssh access is used",
+ "type" : Attribute.BOOL,
+ "value" : False,
+ "flags" : Attribute.DesignOnly,
+ "validation_function" : validation.is_bool,
+ "category" : AC.CATEGORY_DEPLOYMENT,
+ }),
+ DC.LOG_LEVEL : dict({
+ "name" : DC.LOG_LEVEL,
+ "help" : "Log level for instance",
+ "type" : Attribute.ENUM,
+ "value" : DC.ERROR_LEVEL,
+ "allowed" : [
DC.ERROR_LEVEL,
DC.DEBUG_LEVEL
],
- flags = Attribute.DesignOnly,
- validation_function = validation.is_enum,
- category = AC.CATEGORY_DEPLOYMENT,
- )),
- (DC.RECOVER, dict(name = DC.RECOVER,
- help = "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
- type = Attribute.BOOL,
- value = False,
- flags = Attribute.DesignOnly,
- validation_function = validation.is_bool,
- category = AC.CATEGORY_DEPLOYMENT,
- )),
- )
-
- STANDARD_TESTBED_ATTRIBUTES += DEPLOYMENT_ATTRIBUTES
-
- STANDARD_ATTRIBUTE_BUNDLES = {
- "tun_proto" : dict({
- "name": "tun_proto",
- "help": "TUNneling protocol used",
- "type": Attribute.STRING,
- "flags": Attribute.Invisible,
- "validation_function": validation.is_string,
+ "flags" : Attribute.DesignOnly,
+ "validation_function" : validation.is_enum,
+ "category" : AC.CATEGORY_DEPLOYMENT,
}),
- "tun_key" : dict({
- "name": "tun_key",
- "help": "Randomly selected TUNneling protocol cryptographic key. "
- "Endpoints must agree to use the minimum (in lexicographic order) "
- "of both the remote and local sides.",
- "type": Attribute.STRING,
- "flags": Attribute.Invisible,
- "validation_function": validation.is_string,
+ DC.RECOVER : dict({
+ "name" : DC.RECOVER,
+ "help" : "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
+ "type" : Attribute.BOOL,
+ "value" : False,
+ "flags" : Attribute.DesignOnly,
+ "validation_function" : validation.is_bool,
+ "category" : AC.CATEGORY_DEPLOYMENT,
}),
- "tun_addr" : dict({
- "name": "tun_addr",
- "help": "Address (IP, unix socket, whatever) of the tunnel endpoint",
- "type": Attribute.STRING,
- "flags": Attribute.Invisible,
- "validation_function": validation.is_string,
+ })
+
+ # These attributes could appear in the boxes attribute list
+ STANDARD_BOX_ATTRIBUTE_DEFINITIONS = dict({
+ "tun_proto" : dict({
+ "name" : "tun_proto",
+ "help" : "TUNneling protocol used",
+ "type" : Attribute.STRING,
+ "flags" : Attribute.Invisible,
+ "validation_function" : validation.is_string,
}),
- "tun_port" : dict({
- "name": "tun_port",
- "help": "IP port of the tunnel endpoint",
- "type": Attribute.INTEGER,
- "flags": Attribute.Invisible,
- "validation_function": validation.is_integer,
+ "tun_key" : dict({
+ "name" : "tun_key",
+ "help" : "Randomly selected TUNneling protocol cryptographic key. "
+ "Endpoints must agree to use the minimum (in lexicographic order) "
+ "of both the remote and local sides.",
+ "type" : Attribute.STRING,
+ "flags" :Attribute.Invisible,
+ "validation_function" : validation.is_string,
}),
- ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP : dict({
- "name": ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,
- "help": "Commands to set up the environment needed to run NEPI testbeds",
- "type": Attribute.STRING,
- "flags": Attribute.Invisible,
- "validation_function": validation.is_string
+ "tun_addr" : dict({
+ "name": "tun_addr",
+ "help" : "Address (IP, unix socket, whatever) of the tunnel endpoint",
+ "type" : Attribute.STRING,
+ "flags" : Attribute.Invisible,
+ "validation_function" : validation.is_string,
}),
- }
+ "tun_port" : dict({
+ "name" : "tun_port",
+ "help" : "IP port of the tunnel endpoint",
+ "type" : Attribute.INTEGER,
+ "flags" : Attribute.Invisible,
+ "validation_function" : validation.is_integer,
+ }),
+ ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP : dict({
+ "name" : ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,
+ "help" : "Commands to set up the environment needed to run NEPI testbeds",
+ "type" : Attribute.STRING,
+ "flags" : Attribute.Invisible,
+ "validation_function" : validation.is_string
+ }),
+ })
+ STANDARD_TESTBED_ATTRIBUTES.update(DEPLOYMENT_ATTRIBUTES.copy())
def __init__(self, testbed_id, version):
self._version = version
def testbed_attributes(self):
attributes = AttributesMap()
-
- # standard attributes
- self._add_standard_attributes(attributes, None, True, False,
- self.STANDARD_TESTBED_ATTRIBUTES)
-
- # custom attributes - they override standard ones
- for attr_info in self._metadata.testbed_attributes.values():
- name = attr_info["name"]
- help = attr_info["help"]
- type = attr_info["type"]
- value = attr_info["value"] if "value" in attr_info else None
- range = attr_info["range"] if "range" in attr_info else None
- allowed = attr_info["allowed"] if "allowed" in attr_info else None
- flags = attr_info["flags"] if "flags" in attr_info \
- else Attribute.NoFlags
- validation_function = attr_info["validation_function"]
- category = attr_info["category"] if "category" in attr_info else None
- attributes.add_attribute(name, help, type, value,
- range, allowed, flags, validation_function, category)
-
+ testbed_attributes = self._testbed_attributes()
+ self._add_attributes(attributes.add_attribute, testbed_attributes)
return attributes
def build_factories(self):
allow_routes,
has_routes)
- # standard attributes
- self._add_standard_attributes(factory, info, False, True,
- self.STANDARD_BOX_ATTRIBUTES)
-
- # custom attributes - they override standard ones
- self._add_attributes(factory, info, "factory_attributes")
- self._add_attributes(factory, info, "box_attributes", True)
+ factory_attributes = self._factory_attributes(factory_id, info)
+ self._add_attributes(factory.add_attribute, factory_attributes)
+ box_attributes = self._box_attributes(factory_id, info)
+ self._add_attributes(factory.add_box_attribute, box_attributes)
self._add_traces(factory, info)
self._add_tags(factory, info)
__import__(mod_name)
return sys.modules[mod_name]
- def _add_standard_attributes(self, factory, info, design, box, STANDARD_ATTRIBUTES):
- if design:
- attr_bundle = STANDARD_ATTRIBUTES
- else:
- # Only add non-DesignOnly attributes
- def nonDesign(attr_info):
- return not (attr_info[1].get('flags',Attribute.NoFlags) & Attribute.DesignOnly)
- attr_bundle = filter(nonDesign, STANDARD_ATTRIBUTES)
- self._add_attributes(factory, info, None, box,
- attr_bundle = STANDARD_ATTRIBUTES)
+ def _testbed_attributes(self):
+ # standar attributes
+ attributes = self.STANDARD_TESTBED_ATTRIBUTES.copy()
+ # custom attributes
+ attributes.update(self._metadata.testbed_attributes.copy())
+ return attributes
+
+ def _factory_attributes(self, factory_id, info):
+ if "factory_attributes" not in info:
+ return dict()
+ definitions = self._metadata.attributes.copy()
+ # filter attributes corresponding to the factory_id
+ return self._filter_attributes(info["factory_attributes"],
+ definitions)
- def _add_attributes(self, factory, info, attr_key, box_attributes = False, attr_bundle = ()):
- if not attr_bundle and info and attr_key in info:
- definitions = self.STANDARD_ATTRIBUTE_BUNDLES.copy()
+ def _box_attributes(self, factory_id, info):
+ if "box_attributes" in info:
+ definitions = self.STANDARD_BOX_ATTRIBUTE_DEFINITIONS.copy()
definitions.update(self._metadata.attributes)
- attr_bundle = [ (attr_id, definitions[attr_id])
- for attr_id in info[attr_key] ]
- for attr_id, attr_info in attr_bundle:
+ attributes = self._filter_attributes(info["box_attributes"],
+ definitions)
+ else:
+ attributes = dict()
+ attributes.update(self.STANDARD_BOX_ATTRIBUTES.copy())
+ return attributes
+
+ def _filter_attributes(self, attr_list, definitions):
+ # filter attributes corresponding to the factory_id
+ attributes = dict((attr_id, definitions[attr_id]) \
+ for attr_id in attr_list)
+ return attributes
+
+ def _add_attributes(self, add_attr_func, attributes):
+ for attr_id, attr_info in attributes.iteritems():
name = attr_info["name"]
help = attr_info["help"]
type = attr_info["type"]
else Attribute.NoFlags
validation_function = attr_info["validation_function"]
category = attr_info["category"] if "category" in attr_info else None
- if box_attributes:
- factory.add_box_attribute(name, help, type, value, range,
- allowed, flags, validation_function, category)
- else:
- factory.add_attribute(name, help, type, value, range,
- allowed, flags, validation_function, category)
+ add_attr_func(name, help, type, value, range, allowed, flags,
+ validation_function, category)
def _add_traces(self, factory, info):
if "traces" in info: