2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.core.connector import ConnectorType
6 from nepi.core.factory import Factory
9 import nepi.util.environ
10 from nepi.util import tags, validation
11 from nepi.util.constants import ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, \
12 DeploymentConfiguration as DC, \
13 AttributeCategories as AC
15 class Parallel(object):
16 def __init__(self, factory, maxthreads = 16):
17 self.factory = factory
18 self.maxthreads = maxthreads
20 class MetadataInfo(object):
22 def connector_types(self):
23 """ dictionary of dictionaries with allowed connection information.
26 "name": connector type name,
27 "max": maximum number of connections allowed (-1 for no limit),
28 "min": minimum number of connections allowed
31 raise NotImplementedError
34 def connections(self):
35 """ array of dictionaries with allowed connection information.
37 "from": (testbed_id1, factory_id1, connector_type_name1),
38 "to": (testbed_id2, factory_id2, connector_type_name2),
39 "init_code": connection function to invoke for connection initiation
40 "compl_code": connection function to invoke for connection
42 "can_cross": whether the connection can be done across testbed
46 raise NotImplementedError
50 """ dictionary of dictionaries of all available attributes.
52 "name": attribute name,
54 "type": attribute type,
55 "value": default attribute value,
56 "range": (maximum, minimun) values else None if not defined,
57 "allowed": array of posible values,
58 "flags": attributes flags,
59 "validation_function": validation function for the attribute
60 "category": category for the attribute
63 raise NotImplementedError
67 """ dictionary of dictionaries of all available traces.
73 raise NotImplementedError
76 def create_order(self):
77 """ list of factory ids that indicates the order in which the elements
78 should be instantiated. If wrapped within a Parallel instance, they
79 will be instantiated in parallel.
81 raise NotImplementedError
84 def configure_order(self):
85 """ list of factory ids that indicates the order in which the elements
86 should be configured. If wrapped within a Parallel instance, they
87 will be configured in parallel.
89 raise NotImplementedError
92 def preconfigure_order(self):
93 """ list of factory ids that indicates the order in which the elements
94 should be preconfigured. If wrapped within a Parallel instance, they
95 will be configured in parallel.
97 Default: same as configure_order
99 return self.configure_order
102 def prestart_order(self):
103 """ list of factory ids that indicates the order in which the elements
104 should be prestart-configured. If wrapped within a Parallel instance, they
105 will be configured in parallel.
107 Default: same as configure_order
109 return self.configure_order
112 def start_order(self):
113 """ list of factory ids that indicates the order in which the elements
114 should be started. If wrapped within a Parallel instance, they
115 will be started in parallel.
117 Default: same as configure_order
119 return self.configure_order
122 def factories_info(self):
123 """ dictionary of dictionaries of factory specific information
126 "category": category the element belongs to,
127 "create_function": function for element instantiation,
128 "start_function": function for element starting,
129 "stop_function": function for element stoping,
130 "status_function": function for retrieving element status,
131 "preconfigure_function": function for element preconfiguration,
132 (just after connections are made,
133 just before netrefs are resolved)
134 "configure_function": function for element configuration,
135 "prestart_function": function for pre-start
136 element configuration (just before starting applications),
137 useful for synchronization of background setup tasks or
138 lazy instantiation or configuration of attributes
139 that require connection/cross-connection state before
141 After this point, all applications should be able to run.
142 "factory_attributes": list of references to attribute_ids,
143 "box_attributes": list of regerences to attribute_ids,
144 "traces": list of references to trace_id
145 "tags": list of references to tag_id
146 "connector_types": list of references to connector_types
149 raise NotImplementedError
152 def testbed_attributes(self):
153 """ dictionary of attributes for testbed instance configuration
154 attributes_id = dict({
155 "name": attribute name,
157 "type": attribute type,
158 "value": default attribute value,
159 "range": (maximum, minimun) values else None if not defined,
160 "allowed": array of posible values,
161 "flags": attributes flags,
162 "validation_function": validation function for the attribute
163 "category": category for the attribute
167 raise NotImplementedError
170 def testbed_id(self):
171 """ ID for the testbed """
172 raise NotImplementedError
175 def testbed_version(self):
176 """ version for the testbed """
177 raise NotImplementedError
179 class Metadata(object):
180 # These attributes should be added to all boxes
181 STANDARD_BOX_ATTRIBUTES = dict({
184 "validation_function" : validation.is_string,
185 "type" : Attribute.STRING,
186 "flags" : Attribute.ExecReadOnly |\
187 Attribute.ExecImmutable |\
189 "help" : "A unique identifier for referring to this box",
193 # These are the attribute definitions for tagged attributes
194 STANDARD_TAGGED_ATTRIBUTES_DEFINITIONS = dict({
195 "maxAddresses" : dict({
196 "name" : "maxAddresses",
197 "validation_function" : validation.is_integer,
198 "type" : Attribute.INTEGER,
200 "flags" : Attribute.DesignReadOnly |\
201 Attribute.ExecInvisible |\
203 "help" : "The maximum allowed number of addresses",
207 # Attributes to be added to all boxes with specific tags
208 STANDARD_TAGGED_BOX_ATTRIBUTES = dict({
209 tags.ALLOW_ADDRESSES : ["maxAddresses"],
210 tags.HAS_ADDRESSES : ["maxAddresses"],
213 # These attributes should be added to all testbeds
214 STANDARD_TESTBED_ATTRIBUTES = dict({
215 "home_directory" : dict({
216 "name" : "homeDirectory",
217 "validation_function" : validation.is_string,
218 "help" : "Path to the directory where traces and other files will be stored",
219 "type" : Attribute.STRING,
221 "flags" : Attribute.ExecReadOnly |\
222 Attribute.ExecImmutable |\
227 "validation_function" : validation.is_string,
228 "type" : Attribute.STRING,
229 "flags" : Attribute.ExecReadOnly |\
230 Attribute.ExecImmutable |\
232 "help" : "A unique identifier for referring to this testbed",
236 # These attributes should be added to all testbeds
237 DEPLOYMENT_ATTRIBUTES = dict({
238 # TESTBED DEPLOYMENT ATTRIBUTES
239 DC.DEPLOYMENT_ENVIRONMENT_SETUP : dict({
240 "name" : DC.DEPLOYMENT_ENVIRONMENT_SETUP,
241 "validation_function" : validation.is_string,
242 "help" : "Shell commands to run before spawning TestbedController processes",
243 "type" : Attribute.STRING,
244 "flags" : Attribute.ExecReadOnly |\
245 Attribute.ExecImmutable |\
247 "category" : AC.CATEGORY_DEPLOYMENT,
249 DC.DEPLOYMENT_MODE: dict({
250 "name" : DC.DEPLOYMENT_MODE,
251 "help" : "Instance execution mode",
252 "type" : Attribute.ENUM,
253 "value" : DC.MODE_SINGLE_PROCESS,
256 DC.MODE_SINGLE_PROCESS
258 "flags" : Attribute.ExecReadOnly |\
259 Attribute.ExecImmutable |\
261 "validation_function" : validation.is_enum,
262 "category" : AC.CATEGORY_DEPLOYMENT,
264 DC.DEPLOYMENT_COMMUNICATION : dict({
265 "name" : DC.DEPLOYMENT_COMMUNICATION,
266 "help" : "Instance communication mode",
267 "type" : Attribute.ENUM,
268 "value" : DC.ACCESS_LOCAL,
273 "flags" : Attribute.ExecReadOnly |\
274 Attribute.ExecImmutable |\
276 "validation_function" : validation.is_enum,
277 "category" : AC.CATEGORY_DEPLOYMENT,
279 DC.DEPLOYMENT_HOST : dict({
280 "name" : DC.DEPLOYMENT_HOST,
281 "help" : "Host where the testbed will be executed",
282 "type" : Attribute.STRING,
283 "value" : "localhost",
284 "flags" : Attribute.ExecReadOnly |\
285 Attribute.ExecImmutable |\
287 "validation_function" : validation.is_string,
288 "category" : AC.CATEGORY_DEPLOYMENT,
290 DC.DEPLOYMENT_USER : dict({
291 "name" : DC.DEPLOYMENT_USER,
292 "help" : "User on the Host to execute the testbed",
293 "type" : Attribute.STRING,
294 "value" : getpass.getuser(),
295 "flags" : Attribute.ExecReadOnly |\
296 Attribute.ExecImmutable |\
298 "validation_function" : validation.is_string,
299 "category" : AC.CATEGORY_DEPLOYMENT,
301 DC.DEPLOYMENT_KEY : dict({
302 "name" : DC.DEPLOYMENT_KEY,
303 "help" : "Path to SSH key to use for connecting",
304 "type" : Attribute.STRING,
305 "flags" : Attribute.ExecReadOnly |\
306 Attribute.ExecImmutable |\
308 "validation_function" : validation.is_string,
309 "category" : AC.CATEGORY_DEPLOYMENT,
311 DC.DEPLOYMENT_PORT : dict({
312 "name" : DC.DEPLOYMENT_PORT,
313 "help" : "Port on the Host",
314 "type" : Attribute.INTEGER,
316 "flags" : Attribute.ExecReadOnly |\
317 Attribute.ExecImmutable |\
319 "validation_function" : validation.is_integer,
320 "category" : AC.CATEGORY_DEPLOYMENT,
322 DC.ROOT_DIRECTORY : dict({
323 "name" : DC.ROOT_DIRECTORY,
324 "help" : "Root directory for storing process files",
325 "type" : Attribute.STRING,
327 "flags" : Attribute.ExecReadOnly |\
328 Attribute.ExecImmutable |\
330 "validation_function" : validation.is_string, # TODO: validation.is_path
331 "category" : AC.CATEGORY_DEPLOYMENT,
333 DC.USE_AGENT : dict({
334 "name" : DC.USE_AGENT,
335 "help" : "Use -A option for forwarding of the authentication agent, if ssh access is used",
336 "type" : Attribute.BOOL,
338 "flags" : Attribute.ExecReadOnly |\
339 Attribute.ExecImmutable |\
341 "validation_function" : validation.is_bool,
342 "category" : AC.CATEGORY_DEPLOYMENT,
344 DC.LOG_LEVEL : dict({
345 "name" : DC.LOG_LEVEL,
346 "help" : "Log level for instance",
347 "type" : Attribute.ENUM,
348 "value" : DC.ERROR_LEVEL,
353 "flags" : Attribute.ExecReadOnly |\
354 Attribute.ExecImmutable |\
356 "validation_function" : validation.is_enum,
357 "category" : AC.CATEGORY_DEPLOYMENT,
361 "help" : "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
362 "type" : Attribute.BOOL,
364 "flags" : Attribute.ExecReadOnly |\
365 Attribute.ExecImmutable |\
367 "validation_function" : validation.is_bool,
368 "category" : AC.CATEGORY_DEPLOYMENT,
372 # These attributes could appear in the boxes attribute list
373 STANDARD_BOX_ATTRIBUTE_DEFINITIONS = dict({
375 "name" : "tun_proto",
376 "help" : "TUNneling protocol used",
377 "type" : Attribute.STRING,
378 "flags" : Attribute.DesignInvisible | \
379 Attribute.ExecInvisible | \
380 Attribute.ExecImmutable | \
382 "validation_function" : validation.is_string,
386 "help" : "Randomly selected TUNneling protocol cryptographic key. "
387 "Endpoints must agree to use the minimum (in lexicographic order) "
388 "of both the remote and local sides.",
389 "type" : Attribute.STRING,
390 "flags" : Attribute.DesignInvisible | \
391 Attribute.ExecInvisible | \
392 Attribute.ExecImmutable | \
394 "validation_function" : validation.is_string,
398 "help" : "Address (IP, unix socket, whatever) of the tunnel endpoint",
399 "type" : Attribute.STRING,
400 "flags" : Attribute.DesignInvisible | \
401 Attribute.ExecInvisible | \
402 Attribute.ExecImmutable | \
404 "validation_function" : validation.is_string,
408 "help" : "IP port of the tunnel endpoint",
409 "type" : Attribute.INTEGER,
410 "flags" : Attribute.DesignInvisible | \
411 Attribute.ExecInvisible | \
412 Attribute.ExecImmutable | \
414 "validation_function" : validation.is_integer,
416 ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP : dict({
417 "name" : ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,
418 "help" : "Commands to set up the environment needed to run NEPI testbeds",
419 "type" : Attribute.STRING,
420 "flags" : Attribute.DesignInvisible | \
421 Attribute.ExecInvisible | \
422 Attribute.ExecImmutable | \
424 "validation_function" : validation.is_string
428 STANDARD_TESTBED_ATTRIBUTES.update(DEPLOYMENT_ATTRIBUTES.copy())
430 def __init__(self, testbed_id):
431 self._testbed_id = testbed_id
432 metadata_module = self._load_metadata_module()
433 self._metadata = metadata_module.MetadataInfo()
434 if testbed_id != self._metadata.testbed_id:
435 raise RuntimeError("Bad testbed id. Asked for %s, got %s" % \
436 (testbed_id, self._metadata.testbed_id ))
439 def create_order(self):
440 return self._metadata.create_order
443 def configure_order(self):
444 return self._metadata.configure_order
447 def preconfigure_order(self):
448 return self._metadata.preconfigure_order
451 def prestart_order(self):
452 return self._metadata.prestart_order
455 def start_order(self):
456 return self._metadata.start_order
459 def testbed_version(self):
460 return self._metadata.testbed_version
463 def testbed_id(self):
464 return self._testbed_id
466 def testbed_attributes(self):
467 attributes = AttributesMap()
468 testbed_attributes = self._testbed_attributes()
469 self._add_attributes(attributes.add_attribute, testbed_attributes)
472 def build_factories(self):
474 for factory_id, info in self._metadata.factories_info.iteritems():
475 create_function = info.get("create_function")
476 start_function = info.get("start_function")
477 stop_function = info.get("stop_function")
478 status_function = info.get("status_function")
479 configure_function = info.get("configure_function")
480 preconfigure_function = info.get("preconfigure_function")
481 prestart_function = info.get("prestart_function")
483 category = info["category"]
484 factory = Factory(factory_id,
490 preconfigure_function,
495 factory_attributes = self._factory_attributes(info)
496 self._add_attributes(factory.add_attribute, factory_attributes)
497 box_attributes = self._box_attributes(info)
498 self._add_attributes(factory.add_box_attribute, box_attributes)
500 self._add_traces(factory, info)
501 self._add_tags(factory, info)
502 self._add_connector_types(factory, info)
503 factories.append(factory)
506 def _load_metadata_module(self):
507 mod_name = nepi.util.environ.find_testbed(self._testbed_id) + ".metadata"
508 if not mod_name in sys.modules:
510 return sys.modules[mod_name]
512 def _testbed_attributes(self):
514 attributes = self.STANDARD_TESTBED_ATTRIBUTES.copy()
516 attributes.update(self._metadata.testbed_attributes.copy())
519 def _factory_attributes(self, info):
520 tagged_attributes = self._tagged_attributes(info)
521 if "factory_attributes" in info:
522 definitions = self._metadata.attributes.copy()
523 # filter attributes corresponding to the factory_id
524 factory_attributes = self._filter_attributes(info["factory_attributes"],
527 factory_attributes = dict()
528 attributes = dict(tagged_attributes.items() + \
529 factory_attributes.items())
532 def _box_attributes(self, info):
533 tagged_attributes = self._tagged_attributes(info)
534 if "box_attributes" in info:
535 definitions = self.STANDARD_BOX_ATTRIBUTE_DEFINITIONS.copy()
536 definitions.update(self._metadata.attributes)
537 box_attributes = self._filter_attributes(info["box_attributes"],
540 box_attributes = dict()
541 attributes = dict(tagged_attributes.items() + \
542 box_attributes.items())
543 attributes.update(self.STANDARD_BOX_ATTRIBUTES.copy())
546 def _tagged_attributes(self, info):
547 tagged_attributes = dict()
548 for tag_id in info.get("tags", []):
549 if tag_id in self.STANDARD_TAGGED_BOX_ATTRIBUTES:
550 attr_list = self.STANDARD_TAGGED_BOX_ATTRIBUTES[tag_id]
551 attributes = self._filter_attributes(attr_list,
552 self.STANDARD_TAGGED_ATTRIBUTES_DEFINITIONS)
553 tagged_attributes.update(attributes)
554 return tagged_attributes
556 def _filter_attributes(self, attr_list, definitions):
557 # filter attributes not corresponding to the factory
558 attributes = dict((attr_id, definitions[attr_id]) \
559 for attr_id in attr_list)
562 def _add_attributes(self, add_attr_func, attributes):
563 for attr_id, attr_info in attributes.iteritems():
564 name = attr_info["name"]
565 help = attr_info["help"]
566 type = attr_info["type"]
567 value = attr_info.get("value")
568 range = attr_info.get("range")
569 allowed = attr_info.get("allowed")
570 flags = attr_info.get("flags")
571 validation_function = attr_info["validation_function"]
572 category = attr_info.get("category")
573 add_attr_func(name, help, type, value, range, allowed, flags,
574 validation_function, category)
576 def _add_traces(self, factory, info):
577 for trace_id in info.get("traces", []):
578 trace_info = self._metadata.traces[trace_id]
579 name = trace_info["name"]
580 help = trace_info["help"]
581 factory.add_trace(name, help)
583 def _add_tags(self, factory, info):
584 for tag_id in info.get("tags", []):
585 factory.add_tag(tag_id)
587 def _add_connector_types(self, factory, info):
588 if "connector_types" in info:
589 from_connections = dict()
590 to_connections = dict()
591 for connection in self._metadata.connections:
592 from_ = connection["from"]
593 to = connection["to"]
594 can_cross = connection["can_cross"]
595 init_code = connection.get("init_code")
596 compl_code = connection.get("compl_code")
597 if from_ not in from_connections:
598 from_connections[from_] = list()
599 if to not in to_connections:
600 to_connections[to] = list()
601 from_connections[from_].append((to, can_cross, init_code,
603 to_connections[to].append((from_, can_cross, init_code,
605 for connector_id in info["connector_types"]:
606 connector_type_info = self._metadata.connector_types[
608 name = connector_type_info["name"]
609 help = connector_type_info["help"]
610 max = connector_type_info["max"]
611 min = connector_type_info["min"]
612 testbed_id = self._testbed_id
613 factory_id = factory.factory_id
614 connector_type = ConnectorType(testbed_id, factory_id, name,
616 connector_key = (testbed_id, factory_id, name)
617 if connector_key in to_connections:
618 for (from_, can_cross, init_code, compl_code) in \
619 to_connections[connector_key]:
620 (testbed_id_from, factory_id_from, name_from) = from_
621 connector_type.add_from_connection(testbed_id_from,
622 factory_id_from, name_from, can_cross,
623 init_code, compl_code)
624 if connector_key in from_connections:
625 for (to, can_cross, init_code, compl_code) in \
626 from_connections[(testbed_id, factory_id, name)]:
627 (testbed_id_to, factory_id_to, name_to) = to
628 connector_type.add_to_connection(testbed_id_to,
629 factory_id_to, name_to, can_cross, init_code,
631 factory.add_connector_type(connector_type)