2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.core.connector import ConnectorType
6 from nepi.core.factory import Factory
9 import nepi.util.environ
10 from nepi.util import tags, validation
11 from nepi.util.constants import ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, \
12 DeploymentConfiguration as DC, \
13 AttributeCategories as AC
15 class Parallel(object):
16 def __init__(self, factory, maxthreads = 64):
17 self.factory = factory
18 self.maxthreads = maxthreads
20 class MetadataInfo(object):
22 def connector_types(self):
23 """ dictionary of dictionaries with allowed connection information.
26 "name": connector type name,
27 "max": maximum number of connections allowed (-1 for no limit),
28 "min": minimum number of connections allowed
31 raise NotImplementedError
34 def connections(self):
35 """ array of dictionaries with allowed connection information.
37 "from": (testbed_id1, factory_id1, connector_type_name1),
38 "to": (testbed_id2, factory_id2, connector_type_name2),
39 "init_code": connection function to invoke for connection initiation
40 "compl_code": connection function to invoke for connection
42 "can_cross": whether the connection can be done across testbed
46 raise NotImplementedError
50 """ dictionary of dictionaries of all available attributes.
52 "name": attribute name,
54 "type": attribute type,
55 "value": default attribute value,
56 "range": (maximum, minimun) values else None if not defined,
57 "allowed": array of posible values,
58 "flags": attributes flags,
59 "validation_function": validation function for the attribute
60 "category": category for the attribute
63 raise NotImplementedError
67 """ dictionary of dictionaries of all available traces.
73 raise NotImplementedError
76 def create_order(self):
77 """ list of factory ids that indicates the order in which the elements
78 should be instantiated. If wrapped within a Parallel instance, they
79 will be instantiated in parallel.
81 raise NotImplementedError
84 def configure_order(self):
85 """ list of factory ids that indicates the order in which the elements
86 should be configured. If wrapped within a Parallel instance, they
87 will be configured in parallel.
89 raise NotImplementedError
92 def preconfigure_order(self):
93 """ list of factory ids that indicates the order in which the elements
94 should be preconfigured. If wrapped within a Parallel instance, they
95 will be configured in parallel.
97 Default: same as configure_order
99 return self.configure_order
102 def prestart_order(self):
103 """ list of factory ids that indicates the order in which the elements
104 should be prestart-configured. If wrapped within a Parallel instance, they
105 will be configured in parallel.
107 Default: same as configure_order
109 return self.configure_order
112 def start_order(self):
113 """ list of factory ids that indicates the order in which the elements
114 should be started. If wrapped within a Parallel instance, they
115 will be started in parallel.
117 Default: same as configure_order
119 return self.configure_order
122 def factories_info(self):
123 """ dictionary of dictionaries of factory specific information
126 "category": category the element belongs to,
127 "create_function": function for element instantiation,
128 "start_function": function for element starting,
129 "stop_function": function for element stoping,
130 "status_function": function for retrieving element status,
131 "preconfigure_function": function for element preconfiguration,
132 (just after connections are made,
133 just before netrefs are resolved)
134 "configure_function": function for element configuration,
135 "prestart_function": function for pre-start
136 element configuration (just before starting applications),
137 useful for synchronization of background setup tasks or
138 lazy instantiation or configuration of attributes
139 that require connection/cross-connection state before
141 After this point, all applications should be able to run.
142 "factory_attributes": list of references to attribute_ids,
143 "box_attributes": list of regerences to attribute_ids,
144 "traces": list of references to trace_id
145 "tags": list of references to tag_id
146 "connector_types": list of references to connector_types
149 raise NotImplementedError
152 def testbed_attributes(self):
153 """ dictionary of attributes for testbed instance configuration
154 attributes_id = dict({
155 "name": attribute name,
157 "type": attribute type,
158 "value": default attribute value,
159 "range": (maximum, minimun) values else None if not defined,
160 "allowed": array of posible values,
161 "flags": attributes flags,
162 "validation_function": validation function for the attribute
163 "category": category for the attribute
167 raise NotImplementedError
170 def testbed_id(self):
171 """ ID for the testbed """
172 raise NotImplementedError
175 def testbed_version(self):
176 """ version for the testbed """
177 raise NotImplementedError
179 class Metadata(object):
180 # These attributes should be added to all boxes
181 STANDARD_BOX_ATTRIBUTES = dict({
184 "validation_function" : validation.is_string,
185 "type" : Attribute.STRING,
186 "flags" : Attribute.ExecReadOnly |\
187 Attribute.ExecImmutable |\
189 "help" : "A unique identifier for referring to this box",
193 # These are the attribute definitions for tagged attributes
194 STANDARD_TAGGED_ATTRIBUTES_DEFINITIONS = dict({
195 "maxAddresses" : dict({
196 "name" : "maxAddresses",
197 "validation_function" : validation.is_integer,
198 "type" : Attribute.INTEGER,
200 "flags" : Attribute.DesignReadOnly |\
201 Attribute.ExecInvisible |\
203 "help" : "The maximum allowed number of addresses",
207 # Attributes to be added to all boxes with specific tags
208 STANDARD_TAGGED_BOX_ATTRIBUTES = dict({
209 tags.ALLOW_ADDRESSES : ["maxAddresses"],
210 tags.HAS_ADDRESSES : ["maxAddresses"],
213 # These attributes should be added to all testbeds
214 STANDARD_TESTBED_ATTRIBUTES = dict({
215 "home_directory" : dict({
216 "name" : "homeDirectory",
217 "validation_function" : validation.is_string,
218 "help" : "Path to the directory where traces and other files will be stored",
219 "type" : Attribute.STRING,
221 "flags" : Attribute.ExecReadOnly |\
222 Attribute.ExecImmutable |\
227 "validation_function" : validation.is_string,
228 "type" : Attribute.STRING,
229 "flags" : Attribute.ExecReadOnly |\
230 Attribute.ExecImmutable |\
232 "help" : "A unique identifier for referring to this testbed",
236 # These attributes should be added to all testbeds
237 DEPLOYMENT_ATTRIBUTES = dict({
238 # TESTBED DEPLOYMENT ATTRIBUTES
239 DC.DEPLOYMENT_ENVIRONMENT_SETUP : dict({
240 "name" : DC.DEPLOYMENT_ENVIRONMENT_SETUP,
241 "validation_function" : validation.is_string,
242 "help" : "Shell commands to run before spawning TestbedController processes",
243 "type" : Attribute.STRING,
244 "flags" : Attribute.ExecReadOnly |\
245 Attribute.ExecImmutable |\
247 "category" : AC.CATEGORY_DEPLOYMENT,
249 DC.DEPLOYMENT_MODE: dict({
250 "name" : DC.DEPLOYMENT_MODE,
251 "help" : "Instance execution mode",
252 "type" : Attribute.ENUM,
253 "value" : DC.MODE_SINGLE_PROCESS,
256 DC.MODE_SINGLE_PROCESS
258 "flags" : Attribute.ExecReadOnly |\
259 Attribute.ExecImmutable |\
261 "validation_function" : validation.is_enum,
262 "category" : AC.CATEGORY_DEPLOYMENT,
264 DC.DEPLOYMENT_COMMUNICATION : dict({
265 "name" : DC.DEPLOYMENT_COMMUNICATION,
266 "help" : "Instance communication mode",
267 "type" : Attribute.ENUM,
268 "value" : DC.ACCESS_LOCAL,
273 "flags" : Attribute.ExecReadOnly |\
274 Attribute.ExecImmutable |\
276 "validation_function" : validation.is_enum,
277 "category" : AC.CATEGORY_DEPLOYMENT,
279 DC.DEPLOYMENT_HOST : dict({
280 "name" : DC.DEPLOYMENT_HOST,
281 "help" : "Host where the testbed will be executed",
282 "type" : Attribute.STRING,
283 "value" : "localhost",
284 "flags" : Attribute.ExecReadOnly |\
285 Attribute.ExecImmutable |\
287 "validation_function" : validation.is_string,
288 "category" : AC.CATEGORY_DEPLOYMENT,
290 DC.DEPLOYMENT_USER : dict({
291 "name" : DC.DEPLOYMENT_USER,
292 "help" : "User on the Host to execute the testbed",
293 "type" : Attribute.STRING,
294 "value" : getpass.getuser(),
295 "flags" : Attribute.ExecReadOnly |\
296 Attribute.ExecImmutable |\
298 "validation_function" : validation.is_string,
299 "category" : AC.CATEGORY_DEPLOYMENT,
301 DC.DEPLOYMENT_KEY : dict({
302 "name" : DC.DEPLOYMENT_KEY,
303 "help" : "Path to SSH key to use for connecting",
304 "type" : Attribute.STRING,
305 "flags" : Attribute.ExecReadOnly |\
306 Attribute.ExecImmutable |\
308 "validation_function" : validation.is_string,
309 "category" : AC.CATEGORY_DEPLOYMENT,
311 DC.DEPLOYMENT_PORT : dict({
312 "name" : DC.DEPLOYMENT_PORT,
313 "help" : "Port on the Host",
314 "type" : Attribute.INTEGER,
316 "flags" : Attribute.ExecReadOnly |\
317 Attribute.ExecImmutable |\
319 "validation_function" : validation.is_integer,
320 "category" : AC.CATEGORY_DEPLOYMENT,
322 DC.ROOT_DIRECTORY : dict({
323 "name" : DC.ROOT_DIRECTORY,
324 "help" : "Root directory for storing process files",
325 "type" : Attribute.STRING,
327 "flags" : Attribute.ExecReadOnly |\
328 Attribute.ExecImmutable |\
330 "validation_function" : validation.is_string, # TODO: validation.is_path
331 "category" : AC.CATEGORY_DEPLOYMENT,
333 DC.USE_AGENT : dict({
334 "name" : DC.USE_AGENT,
335 "help" : "Use -A option for forwarding of the authentication agent, if ssh access is used",
336 "type" : Attribute.BOOL,
338 "flags" : Attribute.ExecReadOnly |\
339 Attribute.ExecImmutable |\
341 "validation_function" : validation.is_bool,
342 "category" : AC.CATEGORY_DEPLOYMENT,
345 "name" : DC.USE_SUDO,
346 "help" : "Use sudo to run the deamon process. This option only take flace when the server runs in daemon mode.",
347 "type" : Attribute.BOOL,
349 "flags" : Attribute.ExecReadOnly |\
350 Attribute.ExecImmutable |\
352 "validation_function" : validation.is_bool,
353 "category" : AC.CATEGORY_DEPLOYMENT,
355 DC.LOG_LEVEL : dict({
356 "name" : DC.LOG_LEVEL,
357 "help" : "Log level for instance",
358 "type" : Attribute.ENUM,
359 "value" : DC.ERROR_LEVEL,
364 "flags" : Attribute.ExecReadOnly |\
365 Attribute.ExecImmutable |\
367 "validation_function" : validation.is_enum,
368 "category" : AC.CATEGORY_DEPLOYMENT,
370 DC.RECOVERY_POLICY : dict({
371 "name" : DC.RECOVERY_POLICY,
372 "help" : "Specifies what action to take in the event of a failure.",
373 "type" : Attribute.ENUM,
374 "value" : DC.POLICY_FAIL,
380 "flags" : Attribute.ExecReadOnly |\
381 Attribute.ExecImmutable |\
383 "validation_function" : validation.is_enum,
384 "category" : AC.CATEGORY_DEPLOYMENT,
387 PROXY_ATTRIBUTES = dict({
390 "help" : "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
391 "type" : Attribute.BOOL,
393 "flags" : Attribute.ExecReadOnly |\
394 Attribute.ExecImmutable |\
396 "validation_function" : validation.is_bool,
397 "category" : AC.CATEGORY_DEPLOYMENT,
400 PROXY_ATTRIBUTES.update(DEPLOYMENT_ATTRIBUTES)
402 # These attributes could appear in the boxes attribute list
403 STANDARD_BOX_ATTRIBUTE_DEFINITIONS = dict({
405 "name" : "tun_proto",
406 "help" : "TUNneling protocol used",
407 "type" : Attribute.STRING,
408 "flags" : Attribute.DesignInvisible | \
409 Attribute.ExecInvisible | \
410 Attribute.ExecImmutable | \
412 "validation_function" : validation.is_string,
416 "help" : "Randomly selected TUNneling protocol cryptographic key. "
417 "Endpoints must agree to use the minimum (in lexicographic order) "
418 "of both the remote and local sides.",
419 "type" : Attribute.STRING,
420 "flags" : Attribute.DesignInvisible | \
421 Attribute.ExecInvisible | \
422 Attribute.ExecImmutable | \
424 "validation_function" : validation.is_string,
428 "help" : "Address (IP, unix socket, whatever) of the tunnel endpoint",
429 "type" : Attribute.STRING,
430 "flags" : Attribute.DesignInvisible | \
431 Attribute.ExecInvisible | \
432 Attribute.ExecImmutable | \
434 "validation_function" : validation.is_string,
438 "help" : "IP port of the tunnel endpoint",
439 "type" : Attribute.INTEGER,
440 "flags" : Attribute.DesignInvisible | \
441 Attribute.ExecInvisible | \
442 Attribute.ExecImmutable | \
444 "validation_function" : validation.is_integer,
446 "tun_cipher" : dict({
447 "name" : "tun_cipher",
448 "help" : "Cryptographic cipher used for tunnelling",
449 "type" : Attribute.ENUM,
458 "flags" : Attribute.ExecImmutable,
459 "validation_function" : validation.is_enum,
461 ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP : dict({
462 "name" : ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,
463 "help" : "Commands to set up the environment needed to run NEPI testbeds",
464 "type" : Attribute.STRING,
465 "flags" : Attribute.DesignInvisible | \
466 Attribute.ExecInvisible | \
467 Attribute.ExecImmutable | \
469 "validation_function" : validation.is_string
473 STANDARD_TESTBED_ATTRIBUTES.update(DEPLOYMENT_ATTRIBUTES.copy())
475 def __init__(self, testbed_id):
476 self._testbed_id = testbed_id
477 metadata_module = self._load_metadata_module()
478 self._metadata = metadata_module.MetadataInfo()
479 if testbed_id != self._metadata.testbed_id:
480 raise RuntimeError("Bad testbed id. Asked for %s, got %s" % \
481 (testbed_id, self._metadata.testbed_id ))
484 def create_order(self):
485 return self._metadata.create_order
488 def configure_order(self):
489 return self._metadata.configure_order
492 def preconfigure_order(self):
493 return self._metadata.preconfigure_order
496 def prestart_order(self):
497 return self._metadata.prestart_order
500 def start_order(self):
501 return self._metadata.start_order
504 def testbed_version(self):
505 return self._metadata.testbed_version
508 def testbed_id(self):
509 return self._testbed_id
512 def supported_recovery_policies(self):
513 return self._metadata.supported_recovery_policies
515 def testbed_attributes(self):
516 attributes = AttributesMap()
517 testbed_attributes = self._testbed_attributes()
518 self._add_attributes(attributes.add_attribute, testbed_attributes)
521 def build_factories(self):
523 for factory_id, info in self._metadata.factories_info.iteritems():
524 create_function = info.get("create_function")
525 start_function = info.get("start_function")
526 stop_function = info.get("stop_function")
527 status_function = info.get("status_function")
528 configure_function = info.get("configure_function")
529 preconfigure_function = info.get("preconfigure_function")
530 prestart_function = info.get("prestart_function")
532 category = info["category"]
533 factory = Factory(factory_id,
539 preconfigure_function,
544 factory_attributes = self._factory_attributes(info)
545 self._add_attributes(factory.add_attribute, factory_attributes)
546 box_attributes = self._box_attributes(info)
547 self._add_attributes(factory.add_box_attribute, box_attributes)
549 self._add_traces(factory, info)
550 self._add_tags(factory, info)
551 self._add_connector_types(factory, info)
552 factories.append(factory)
555 def _load_metadata_module(self):
556 mod_name = nepi.util.environ.find_testbed(self._testbed_id) + ".metadata"
557 if not mod_name in sys.modules:
559 return sys.modules[mod_name]
561 def _testbed_attributes(self):
563 attributes = self.STANDARD_TESTBED_ATTRIBUTES.copy()
565 attributes.update(self._metadata.testbed_attributes.copy())
568 def _factory_attributes(self, info):
569 tagged_attributes = self._tagged_attributes(info)
570 if "factory_attributes" in info:
571 definitions = self._metadata.attributes.copy()
572 # filter attributes corresponding to the factory_id
573 factory_attributes = self._filter_attributes(info["factory_attributes"],
576 factory_attributes = dict()
577 attributes = dict(tagged_attributes.items() + \
578 factory_attributes.items())
581 def _box_attributes(self, info):
582 tagged_attributes = self._tagged_attributes(info)
583 if "box_attributes" in info:
584 definitions = self.STANDARD_BOX_ATTRIBUTE_DEFINITIONS.copy()
585 definitions.update(self._metadata.attributes)
586 box_attributes = self._filter_attributes(info["box_attributes"],
589 box_attributes = dict()
590 attributes = dict(tagged_attributes.items() + \
591 box_attributes.items())
592 attributes.update(self.STANDARD_BOX_ATTRIBUTES.copy())
595 def _tagged_attributes(self, info):
596 tagged_attributes = dict()
597 for tag_id in info.get("tags", []):
598 if tag_id in self.STANDARD_TAGGED_BOX_ATTRIBUTES:
599 attr_list = self.STANDARD_TAGGED_BOX_ATTRIBUTES[tag_id]
600 attributes = self._filter_attributes(attr_list,
601 self.STANDARD_TAGGED_ATTRIBUTES_DEFINITIONS)
602 tagged_attributes.update(attributes)
603 return tagged_attributes
605 def _filter_attributes(self, attr_list, definitions):
606 # filter attributes not corresponding to the factory
607 attributes = dict((attr_id, definitions[attr_id]) \
608 for attr_id in attr_list)
611 def _add_attributes(self, add_attr_func, attributes):
612 for attr_id, attr_info in attributes.iteritems():
613 name = attr_info["name"]
614 help = attr_info["help"]
615 type = attr_info["type"]
616 value = attr_info.get("value")
617 range = attr_info.get("range")
618 allowed = attr_info.get("allowed")
619 flags = attr_info.get("flags")
620 validation_function = attr_info["validation_function"]
621 category = attr_info.get("category")
622 add_attr_func(name, help, type, value, range, allowed, flags,
623 validation_function, category)
625 def _add_traces(self, factory, info):
626 for trace_id in info.get("traces", []):
627 trace_info = self._metadata.traces[trace_id]
628 name = trace_info["name"]
629 help = trace_info["help"]
630 factory.add_trace(name, help)
632 def _add_tags(self, factory, info):
633 for tag_id in info.get("tags", []):
634 factory.add_tag(tag_id)
636 def _add_connector_types(self, factory, info):
637 if "connector_types" in info:
638 from_connections = dict()
639 to_connections = dict()
640 for connection in self._metadata.connections:
641 froms = connection["from"]
642 tos = connection["to"]
643 can_cross = connection["can_cross"]
644 init_code = connection.get("init_code")
645 compl_code = connection.get("compl_code")
647 for from_ in _expand(froms):
648 for to in _expand(tos):
649 if from_ not in from_connections:
650 from_connections[from_] = list()
651 if to not in to_connections:
652 to_connections[to] = list()
653 from_connections[from_].append((to, can_cross, init_code,
655 to_connections[to].append((from_, can_cross, init_code,
657 for connector_id in info["connector_types"]:
658 connector_type_info = self._metadata.connector_types[
660 name = connector_type_info["name"]
661 help = connector_type_info["help"]
662 max = connector_type_info["max"]
663 min = connector_type_info["min"]
664 testbed_id = self._testbed_id
665 factory_id = factory.factory_id
666 connector_type = ConnectorType(testbed_id, factory_id, name,
668 connector_key = (testbed_id, factory_id, name)
669 if connector_key in to_connections:
670 for (from_, can_cross, init_code, compl_code) in \
671 to_connections[connector_key]:
672 (testbed_id_from, factory_id_from, name_from) = from_
673 connector_type.add_from_connection(testbed_id_from,
674 factory_id_from, name_from, can_cross,
675 init_code, compl_code)
676 if connector_key in from_connections:
677 for (to, can_cross, init_code, compl_code) in \
678 from_connections[(testbed_id, factory_id, name)]:
679 (testbed_id_to, factory_id_to, name_to) = to
680 connector_type.add_to_connection(testbed_id_to,
681 factory_id_to, name_to, can_cross, init_code,
683 factory.add_connector_type(connector_type)
688 Expands multiple values in the "val" tuple to create cross products:
690 >>> list(_expand((1,2,3)))
692 >>> list(_expand((1,(2,4,5),3)))
693 [(1, 2, 3), (1, 4, 3), (1, 5, 3)]
694 >>> list(_expand(((1,2),(2,4,5),3)))
695 [(1, 2, 3), (1, 4, 3), (1, 5, 3), (2, 2, 3), (2, 4, 3), (2, 5, 3)]
699 elif isinstance(val[0], (list,set,tuple)):
702 for e_val in _expand(val[1:]):
706 for e_val in _expand(val[1:]):