2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.core.connector import ConnectorType
6 from nepi.core.factory import Factory
9 import nepi.util.environ
10 from nepi.util import tags, validation
11 from nepi.util.constants import ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, \
12 DeploymentConfiguration as DC, \
13 AttributeCategories as AC
15 class Parallel(object):
16 def __init__(self, factory, maxthreads = 64):
17 self.factory = factory
18 self.maxthreads = maxthreads
20 class MetadataInfo(object):
22 def connector_types(self):
23 """ dictionary of dictionaries with allowed connection information.
26 "name": connector type name,
27 "max": maximum number of connections allowed (-1 for no limit),
28 "min": minimum number of connections allowed
31 raise NotImplementedError
34 def connections(self):
35 """ array of dictionaries with allowed connection information.
37 "from": (testbed_id1, factory_id1, connector_type_name1),
38 "to": (testbed_id2, factory_id2, connector_type_name2),
39 "init_code": connection function to invoke for connection initiation
40 "compl_code": connection function to invoke for connection
42 "can_cross": whether the connection can be done across testbed
46 raise NotImplementedError
50 """ dictionary of dictionaries of all available attributes.
52 "name": attribute name,
54 "type": attribute type,
55 "value": default attribute value,
56 "range": (maximum, minimun) values else None if not defined,
57 "allowed": array of posible values,
58 "flags": attributes flags,
59 "validation_function": validation function for the attribute
60 "category": category for the attribute
63 raise NotImplementedError
67 """ dictionary of dictionaries of all available traces.
73 raise NotImplementedError
76 def create_order(self):
77 """ list of factory ids that indicates the order in which the elements
78 should be instantiated. If wrapped within a Parallel instance, they
79 will be instantiated in parallel.
81 raise NotImplementedError
84 def configure_order(self):
85 """ list of factory ids that indicates the order in which the elements
86 should be configured. If wrapped within a Parallel instance, they
87 will be configured in parallel.
89 raise NotImplementedError
92 def preconfigure_order(self):
93 """ list of factory ids that indicates the order in which the elements
94 should be preconfigured. If wrapped within a Parallel instance, they
95 will be configured in parallel.
97 Default: same as configure_order
99 return self.configure_order
102 def prestart_order(self):
103 """ list of factory ids that indicates the order in which the elements
104 should be prestart-configured. If wrapped within a Parallel instance, they
105 will be configured in parallel.
107 Default: same as configure_order
109 return self.configure_order
112 def start_order(self):
113 """ list of factory ids that indicates the order in which the elements
114 should be started. If wrapped within a Parallel instance, they
115 will be started in parallel.
117 Default: same as configure_order
119 return self.configure_order
122 def factories_info(self):
123 """ dictionary of dictionaries of factory specific information
126 "category": category the element belongs to,
127 "create_function": function for element instantiation,
128 "start_function": function for element starting,
129 "stop_function": function for element stoping,
130 "status_function": function for retrieving element status,
131 "preconfigure_function": function for element preconfiguration,
132 (just after connections are made,
133 just before netrefs are resolved)
134 "configure_function": function for element configuration,
135 "prestart_function": function for pre-start
136 element configuration (just before starting applications),
137 useful for synchronization of background setup tasks or
138 lazy instantiation or configuration of attributes
139 that require connection/cross-connection state before
141 After this point, all applications should be able to run.
142 "factory_attributes": list of references to attribute_ids,
143 "box_attributes": list of regerences to attribute_ids,
144 "traces": list of references to trace_id
145 "tags": list of references to tag_id
146 "connector_types": list of references to connector_types
149 raise NotImplementedError
152 def testbed_attributes(self):
153 """ dictionary of attributes for testbed instance configuration
154 attributes_id = dict({
155 "name": attribute name,
157 "type": attribute type,
158 "value": default attribute value,
159 "range": (maximum, minimun) values else None if not defined,
160 "allowed": array of posible values,
161 "flags": attributes flags,
162 "validation_function": validation function for the attribute
163 "category": category for the attribute
167 raise NotImplementedError
170 def testbed_id(self):
171 """ ID for the testbed """
172 raise NotImplementedError
175 def testbed_version(self):
176 """ version for the testbed """
177 raise NotImplementedError
179 class Metadata(object):
180 # These attributes should be added to all boxes
181 STANDARD_BOX_ATTRIBUTES = dict({
184 "validation_function" : validation.is_string,
185 "type" : Attribute.STRING,
186 "flags" : Attribute.ExecReadOnly |\
187 Attribute.ExecImmutable |\
189 "help" : "A unique identifier for referring to this box",
193 # These are the attribute definitions for tagged attributes
194 STANDARD_TAGGED_ATTRIBUTES_DEFINITIONS = dict({
195 "maxAddresses" : dict({
196 "name" : "maxAddresses",
197 "validation_function" : validation.is_integer,
198 "type" : Attribute.INTEGER,
200 "flags" : Attribute.DesignReadOnly |\
201 Attribute.ExecInvisible |\
203 "help" : "The maximum allowed number of addresses",
207 # Attributes to be added to all boxes with specific tags
208 STANDARD_TAGGED_BOX_ATTRIBUTES = dict({
209 tags.ALLOW_ADDRESSES : ["maxAddresses"],
210 tags.HAS_ADDRESSES : ["maxAddresses"],
213 # These attributes should be added to all testbeds
214 STANDARD_TESTBED_ATTRIBUTES = dict({
215 "home_directory" : dict({
216 "name" : "homeDirectory",
217 "validation_function" : validation.is_string,
218 "help" : "Path to the directory where traces and other files will be stored",
219 "type" : Attribute.STRING,
221 "flags" : Attribute.ExecReadOnly |\
222 Attribute.ExecImmutable |\
227 "validation_function" : validation.is_string,
228 "type" : Attribute.STRING,
229 "flags" : Attribute.ExecReadOnly |\
230 Attribute.ExecImmutable |\
232 "help" : "A unique identifier for referring to this testbed",
236 # These attributes should be added to all testbeds
237 DEPLOYMENT_ATTRIBUTES = dict({
238 # TESTBED DEPLOYMENT ATTRIBUTES
239 DC.DEPLOYMENT_ENVIRONMENT_SETUP : dict({
240 "name" : DC.DEPLOYMENT_ENVIRONMENT_SETUP,
241 "validation_function" : validation.is_string,
242 "help" : "Shell commands to run before spawning TestbedController processes",
243 "type" : Attribute.STRING,
244 "flags" : Attribute.ExecReadOnly |\
245 Attribute.ExecImmutable |\
247 "category" : AC.CATEGORY_DEPLOYMENT,
249 DC.DEPLOYMENT_MODE: dict({
250 "name" : DC.DEPLOYMENT_MODE,
251 "help" : "Instance execution mode",
252 "type" : Attribute.ENUM,
253 "value" : DC.MODE_SINGLE_PROCESS,
256 DC.MODE_SINGLE_PROCESS
258 "flags" : Attribute.ExecReadOnly |\
259 Attribute.ExecImmutable |\
261 "validation_function" : validation.is_enum,
262 "category" : AC.CATEGORY_DEPLOYMENT,
264 DC.DEPLOYMENT_COMMUNICATION : dict({
265 "name" : DC.DEPLOYMENT_COMMUNICATION,
266 "help" : "Instance communication mode",
267 "type" : Attribute.ENUM,
268 "value" : DC.ACCESS_LOCAL,
273 "flags" : Attribute.ExecReadOnly |\
274 Attribute.ExecImmutable |\
276 "validation_function" : validation.is_enum,
277 "category" : AC.CATEGORY_DEPLOYMENT,
279 DC.DEPLOYMENT_HOST : dict({
280 "name" : DC.DEPLOYMENT_HOST,
281 "help" : "Host where the testbed will be executed",
282 "type" : Attribute.STRING,
283 "value" : "localhost",
284 "flags" : Attribute.ExecReadOnly |\
285 Attribute.ExecImmutable |\
287 "validation_function" : validation.is_string,
288 "category" : AC.CATEGORY_DEPLOYMENT,
290 DC.DEPLOYMENT_USER : dict({
291 "name" : DC.DEPLOYMENT_USER,
292 "help" : "User on the Host to execute the testbed",
293 "type" : Attribute.STRING,
294 "value" : getpass.getuser(),
295 "flags" : Attribute.ExecReadOnly |\
296 Attribute.ExecImmutable |\
298 "validation_function" : validation.is_string,
299 "category" : AC.CATEGORY_DEPLOYMENT,
301 DC.DEPLOYMENT_KEY : dict({
302 "name" : DC.DEPLOYMENT_KEY,
303 "help" : "Path to SSH key to use for connecting",
304 "type" : Attribute.STRING,
305 "flags" : Attribute.ExecReadOnly |\
306 Attribute.ExecImmutable |\
308 "validation_function" : validation.is_string,
309 "category" : AC.CATEGORY_DEPLOYMENT,
311 DC.DEPLOYMENT_PORT : dict({
312 "name" : DC.DEPLOYMENT_PORT,
313 "help" : "Port on the Host",
314 "type" : Attribute.INTEGER,
316 "flags" : Attribute.ExecReadOnly |\
317 Attribute.ExecImmutable |\
319 "validation_function" : validation.is_integer,
320 "category" : AC.CATEGORY_DEPLOYMENT,
322 DC.ROOT_DIRECTORY : dict({
323 "name" : DC.ROOT_DIRECTORY,
324 "help" : "Root directory for storing process files",
325 "type" : Attribute.STRING,
327 "flags" : Attribute.ExecReadOnly |\
328 Attribute.ExecImmutable |\
330 "validation_function" : validation.is_string, # TODO: validation.is_path
331 "category" : AC.CATEGORY_DEPLOYMENT,
333 DC.USE_AGENT : dict({
334 "name" : DC.USE_AGENT,
335 "help" : "Use -A option for forwarding of the authentication agent, if ssh access is used",
336 "type" : Attribute.BOOL,
338 "flags" : Attribute.ExecReadOnly |\
339 Attribute.ExecImmutable |\
341 "validation_function" : validation.is_bool,
342 "category" : AC.CATEGORY_DEPLOYMENT,
344 DC.LOG_LEVEL : dict({
345 "name" : DC.LOG_LEVEL,
346 "help" : "Log level for instance",
347 "type" : Attribute.ENUM,
348 "value" : DC.ERROR_LEVEL,
353 "flags" : Attribute.ExecReadOnly |\
354 Attribute.ExecImmutable |\
356 "validation_function" : validation.is_enum,
357 "category" : AC.CATEGORY_DEPLOYMENT,
359 DC.RECOVERY_POLICY : dict({
360 "name" : DC.RECOVERY_POLICY,
361 "help" : "Specifies what action to take in the event of a failure.",
362 "type" : Attribute.ENUM,
363 "value" : DC.POLICY_FAIL,
369 "flags" : Attribute.ExecReadOnly |\
370 Attribute.ExecImmutable |\
372 "validation_function" : validation.is_enum,
373 "category" : AC.CATEGORY_DEPLOYMENT,
376 PROXY_ATTRIBUTES = dict({
379 "help" : "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
380 "type" : Attribute.BOOL,
382 "flags" : Attribute.ExecReadOnly |\
383 Attribute.ExecImmutable |\
385 "validation_function" : validation.is_bool,
386 "category" : AC.CATEGORY_DEPLOYMENT,
389 PROXY_ATTRIBUTES.update(DEPLOYMENT_ATTRIBUTES)
391 # These attributes could appear in the boxes attribute list
392 STANDARD_BOX_ATTRIBUTE_DEFINITIONS = dict({
394 "name" : "tun_proto",
395 "help" : "TUNneling protocol used",
396 "type" : Attribute.STRING,
397 "flags" : Attribute.DesignInvisible | \
398 Attribute.ExecInvisible | \
399 Attribute.ExecImmutable | \
401 "validation_function" : validation.is_string,
405 "help" : "Randomly selected TUNneling protocol cryptographic key. "
406 "Endpoints must agree to use the minimum (in lexicographic order) "
407 "of both the remote and local sides.",
408 "type" : Attribute.STRING,
409 "flags" : Attribute.DesignInvisible | \
410 Attribute.ExecInvisible | \
411 Attribute.ExecImmutable | \
413 "validation_function" : validation.is_string,
417 "help" : "Address (IP, unix socket, whatever) of the tunnel endpoint",
418 "type" : Attribute.STRING,
419 "flags" : Attribute.DesignInvisible | \
420 Attribute.ExecInvisible | \
421 Attribute.ExecImmutable | \
423 "validation_function" : validation.is_string,
427 "help" : "IP port of the tunnel endpoint",
428 "type" : Attribute.INTEGER,
429 "flags" : Attribute.DesignInvisible | \
430 Attribute.ExecInvisible | \
431 Attribute.ExecImmutable | \
433 "validation_function" : validation.is_integer,
435 "tun_cipher" : dict({
436 "name" : "tun_cipher",
437 "help" : "Cryptographic cipher used for tunnelling",
438 "type" : Attribute.ENUM,
447 "flags" : Attribute.ExecImmutable,
448 "validation_function" : validation.is_enum,
450 ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP : dict({
451 "name" : ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,
452 "help" : "Commands to set up the environment needed to run NEPI testbeds",
453 "type" : Attribute.STRING,
454 "flags" : Attribute.DesignInvisible | \
455 Attribute.ExecInvisible | \
456 Attribute.ExecImmutable | \
458 "validation_function" : validation.is_string
462 STANDARD_TESTBED_ATTRIBUTES.update(DEPLOYMENT_ATTRIBUTES.copy())
464 def __init__(self, testbed_id):
465 self._testbed_id = testbed_id
466 metadata_module = self._load_metadata_module()
467 self._metadata = metadata_module.MetadataInfo()
468 if testbed_id != self._metadata.testbed_id:
469 raise RuntimeError("Bad testbed id. Asked for %s, got %s" % \
470 (testbed_id, self._metadata.testbed_id ))
473 def create_order(self):
474 return self._metadata.create_order
477 def configure_order(self):
478 return self._metadata.configure_order
481 def preconfigure_order(self):
482 return self._metadata.preconfigure_order
485 def prestart_order(self):
486 return self._metadata.prestart_order
489 def start_order(self):
490 return self._metadata.start_order
493 def testbed_version(self):
494 return self._metadata.testbed_version
497 def testbed_id(self):
498 return self._testbed_id
501 def supported_recovery_policies(self):
502 return self._metadata.supported_recovery_policies
504 def testbed_attributes(self):
505 attributes = AttributesMap()
506 testbed_attributes = self._testbed_attributes()
507 self._add_attributes(attributes.add_attribute, testbed_attributes)
510 def build_factories(self):
512 for factory_id, info in self._metadata.factories_info.iteritems():
513 create_function = info.get("create_function")
514 start_function = info.get("start_function")
515 stop_function = info.get("stop_function")
516 status_function = info.get("status_function")
517 configure_function = info.get("configure_function")
518 preconfigure_function = info.get("preconfigure_function")
519 prestart_function = info.get("prestart_function")
521 category = info["category"]
522 factory = Factory(factory_id,
528 preconfigure_function,
533 factory_attributes = self._factory_attributes(info)
534 self._add_attributes(factory.add_attribute, factory_attributes)
535 box_attributes = self._box_attributes(info)
536 self._add_attributes(factory.add_box_attribute, box_attributes)
538 self._add_traces(factory, info)
539 self._add_tags(factory, info)
540 self._add_connector_types(factory, info)
541 factories.append(factory)
544 def _load_metadata_module(self):
545 mod_name = nepi.util.environ.find_testbed(self._testbed_id) + ".metadata"
546 if not mod_name in sys.modules:
548 return sys.modules[mod_name]
550 def _testbed_attributes(self):
552 attributes = self.STANDARD_TESTBED_ATTRIBUTES.copy()
554 attributes.update(self._metadata.testbed_attributes.copy())
557 def _factory_attributes(self, info):
558 tagged_attributes = self._tagged_attributes(info)
559 if "factory_attributes" in info:
560 definitions = self._metadata.attributes.copy()
561 # filter attributes corresponding to the factory_id
562 factory_attributes = self._filter_attributes(info["factory_attributes"],
565 factory_attributes = dict()
566 attributes = dict(tagged_attributes.items() + \
567 factory_attributes.items())
570 def _box_attributes(self, info):
571 tagged_attributes = self._tagged_attributes(info)
572 if "box_attributes" in info:
573 definitions = self.STANDARD_BOX_ATTRIBUTE_DEFINITIONS.copy()
574 definitions.update(self._metadata.attributes)
575 box_attributes = self._filter_attributes(info["box_attributes"],
578 box_attributes = dict()
579 attributes = dict(tagged_attributes.items() + \
580 box_attributes.items())
581 attributes.update(self.STANDARD_BOX_ATTRIBUTES.copy())
584 def _tagged_attributes(self, info):
585 tagged_attributes = dict()
586 for tag_id in info.get("tags", []):
587 if tag_id in self.STANDARD_TAGGED_BOX_ATTRIBUTES:
588 attr_list = self.STANDARD_TAGGED_BOX_ATTRIBUTES[tag_id]
589 attributes = self._filter_attributes(attr_list,
590 self.STANDARD_TAGGED_ATTRIBUTES_DEFINITIONS)
591 tagged_attributes.update(attributes)
592 return tagged_attributes
594 def _filter_attributes(self, attr_list, definitions):
595 # filter attributes not corresponding to the factory
596 attributes = dict((attr_id, definitions[attr_id]) \
597 for attr_id in attr_list)
600 def _add_attributes(self, add_attr_func, attributes):
601 for attr_id, attr_info in attributes.iteritems():
602 name = attr_info["name"]
603 help = attr_info["help"]
604 type = attr_info["type"]
605 value = attr_info.get("value")
606 range = attr_info.get("range")
607 allowed = attr_info.get("allowed")
608 flags = attr_info.get("flags")
609 validation_function = attr_info["validation_function"]
610 category = attr_info.get("category")
611 add_attr_func(name, help, type, value, range, allowed, flags,
612 validation_function, category)
614 def _add_traces(self, factory, info):
615 for trace_id in info.get("traces", []):
616 trace_info = self._metadata.traces[trace_id]
617 name = trace_info["name"]
618 help = trace_info["help"]
619 factory.add_trace(name, help)
621 def _add_tags(self, factory, info):
622 for tag_id in info.get("tags", []):
623 factory.add_tag(tag_id)
625 def _add_connector_types(self, factory, info):
626 if "connector_types" in info:
627 from_connections = dict()
628 to_connections = dict()
629 for connection in self._metadata.connections:
630 from_ = connection["from"]
631 to = connection["to"]
632 can_cross = connection["can_cross"]
633 init_code = connection.get("init_code")
634 compl_code = connection.get("compl_code")
635 if from_ not in from_connections:
636 from_connections[from_] = list()
637 if to not in to_connections:
638 to_connections[to] = list()
639 from_connections[from_].append((to, can_cross, init_code,
641 to_connections[to].append((from_, can_cross, init_code,
643 for connector_id in info["connector_types"]:
644 connector_type_info = self._metadata.connector_types[
646 name = connector_type_info["name"]
647 help = connector_type_info["help"]
648 max = connector_type_info["max"]
649 min = connector_type_info["min"]
650 testbed_id = self._testbed_id
651 factory_id = factory.factory_id
652 connector_type = ConnectorType(testbed_id, factory_id, name,
654 connector_key = (testbed_id, factory_id, name)
655 if connector_key in to_connections:
656 for (from_, can_cross, init_code, compl_code) in \
657 to_connections[connector_key]:
658 (testbed_id_from, factory_id_from, name_from) = from_
659 connector_type.add_from_connection(testbed_id_from,
660 factory_id_from, name_from, can_cross,
661 init_code, compl_code)
662 if connector_key in from_connections:
663 for (to, can_cross, init_code, compl_code) in \
664 from_connections[(testbed_id, factory_id, name)]:
665 (testbed_id_to, factory_id_to, name_to) = to
666 connector_type.add_to_connection(testbed_id_to,
667 factory_id_to, name_to, can_cross, init_code,
669 factory.add_connector_type(connector_type)