1 # -*- coding: utf-8 -*-
3 from nepi.core.attributes import Attribute, AttributesMap
4 from nepi.core.connector import ConnectorType
5 from nepi.core.factory import Factory
8 import nepi.util.environ
9 from nepi.util import tags, validation
10 from nepi.util.constants import ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, \
11 DeploymentConfiguration as DC, \
12 AttributeCategories as AC
14 class Parallel(object):
15 def __init__(self, factory, maxthreads = 64):
16 self.factory = factory
17 self.maxthreads = maxthreads
19 class MetadataInfo(object):
21 def connector_types(self):
22 """ dictionary of dictionaries with allowed connection information.
25 "name": connector type name,
26 "max": maximum number of connections allowed (-1 for no limit),
27 "min": minimum number of connections allowed
30 raise NotImplementedError
33 def connections(self):
34 """ array of dictionaries with allowed connection information.
36 "from": (testbed_id1, factory_id1, connector_type_name1),
37 "to": (testbed_id2, factory_id2, connector_type_name2),
38 "init_code": connection function to invoke for connection initiation
39 "compl_code": connection function to invoke for connection
41 "can_cross": whether the connection can be done across testbed
45 raise NotImplementedError
49 """ dictionary of dictionaries of all available attributes.
51 "name": attribute name,
53 "type": attribute type,
54 "value": default attribute value,
55 "range": (maximum, minimun) values else None if not defined,
56 "allowed": array of posible values,
57 "flags": attributes flags,
58 "validation_function": validation function for the attribute
59 "category": category for the attribute
62 raise NotImplementedError
66 """ dictionary of dictionaries of all available traces.
72 raise NotImplementedError
75 def create_order(self):
76 """ list of factory ids that indicates the order in which the elements
77 should be instantiated. If wrapped within a Parallel instance, they
78 will be instantiated in parallel.
80 raise NotImplementedError
83 def configure_order(self):
84 """ list of factory ids that indicates the order in which the elements
85 should be configured. If wrapped within a Parallel instance, they
86 will be configured in parallel.
88 raise NotImplementedError
91 def preconfigure_order(self):
92 """ list of factory ids that indicates the order in which the elements
93 should be preconfigured. If wrapped within a Parallel instance, they
94 will be configured in parallel.
96 Default: same as configure_order
98 return self.configure_order
101 def prestart_order(self):
102 """ list of factory ids that indicates the order in which the elements
103 should be prestart-configured. If wrapped within a Parallel instance, they
104 will be configured in parallel.
106 Default: same as configure_order
108 return self.configure_order
111 def start_order(self):
112 """ list of factory ids that indicates the order in which the elements
113 should be started. If wrapped within a Parallel instance, they
114 will be started in parallel.
116 Default: same as configure_order
118 return self.configure_order
121 def factories_info(self):
122 """ dictionary of dictionaries of factory specific information
125 "category": category the element belongs to,
126 "create_function": function for element instantiation,
127 "start_function": function for element starting,
128 "stop_function": function for element stoping,
129 "status_function": function for retrieving element status,
130 "preconfigure_function": function for element preconfiguration,
131 (just after connections are made,
132 just before netrefs are resolved)
133 "configure_function": function for element configuration,
134 "prestart_function": function for pre-start
135 element configuration (just before starting applications),
136 useful for synchronization of background setup tasks or
137 lazy instantiation or configuration of attributes
138 that require connection/cross-connection state before
140 After this point, all applications should be able to run.
141 "factory_attributes": list of references to attribute_ids,
142 "box_attributes": list of regerences to attribute_ids,
143 "traces": list of references to trace_id
144 "tags": list of references to tag_id
145 "connector_types": list of references to connector_types
148 raise NotImplementedError
151 def testbed_attributes(self):
152 """ dictionary of attributes for testbed instance configuration
153 attributes_id = dict({
154 "name": attribute name,
156 "type": attribute type,
157 "value": default attribute value,
158 "range": (maximum, minimun) values else None if not defined,
159 "allowed": array of posible values,
160 "flags": attributes flags,
161 "validation_function": validation function for the attribute
162 "category": category for the attribute
166 raise NotImplementedError
169 def testbed_id(self):
170 """ ID for the testbed """
171 raise NotImplementedError
174 def testbed_version(self):
175 """ version for the testbed """
176 raise NotImplementedError
178 class Metadata(object):
179 # These attributes should be added to all boxes
180 STANDARD_BOX_ATTRIBUTES = dict({
183 "validation_function" : validation.is_string,
184 "type" : Attribute.STRING,
185 "flags" : Attribute.ExecReadOnly |\
186 Attribute.ExecImmutable |\
188 "help" : "A unique identifier for referring to this box",
192 # These are the attribute definitions for tagged attributes
193 STANDARD_TAGGED_ATTRIBUTES_DEFINITIONS = dict({
194 "maxAddresses" : dict({
195 "name" : "maxAddresses",
196 "validation_function" : validation.is_integer,
197 "type" : Attribute.INTEGER,
199 "flags" : Attribute.DesignReadOnly |\
200 Attribute.ExecInvisible |\
202 "help" : "The maximum allowed number of addresses",
206 # Attributes to be added to all boxes with specific tags
207 STANDARD_TAGGED_BOX_ATTRIBUTES = dict({
208 tags.ALLOW_ADDRESSES : ["maxAddresses"],
209 tags.HAS_ADDRESSES : ["maxAddresses"],
212 # These attributes should be added to all testbeds
213 STANDARD_TESTBED_ATTRIBUTES = dict({
214 "home_directory" : dict({
215 "name" : "homeDirectory",
216 "validation_function" : validation.is_string,
217 "help" : "Path to the directory where traces and other files will be stored",
218 "type" : Attribute.STRING,
220 "flags" : Attribute.ExecReadOnly |\
221 Attribute.ExecImmutable |\
226 "validation_function" : validation.is_string,
227 "type" : Attribute.STRING,
228 "flags" : Attribute.ExecReadOnly |\
229 Attribute.ExecImmutable |\
231 "help" : "A unique identifier for referring to this testbed",
235 # These attributes should be added to all testbeds
236 DEPLOYMENT_ATTRIBUTES = dict({
237 # TESTBED DEPLOYMENT ATTRIBUTES
238 DC.DEPLOYMENT_ENVIRONMENT_SETUP : dict({
239 "name" : DC.DEPLOYMENT_ENVIRONMENT_SETUP,
240 "validation_function" : validation.is_string,
241 "help" : "Shell commands to run before spawning TestbedController processes",
242 "type" : Attribute.STRING,
243 "flags" : Attribute.ExecReadOnly |\
244 Attribute.ExecImmutable |\
246 "category" : AC.CATEGORY_DEPLOYMENT,
248 DC.DEPLOYMENT_MODE: dict({
249 "name" : DC.DEPLOYMENT_MODE,
250 "help" : "Instance execution mode",
251 "type" : Attribute.ENUM,
252 "value" : DC.MODE_SINGLE_PROCESS,
255 DC.MODE_SINGLE_PROCESS
257 "flags" : Attribute.ExecReadOnly |\
258 Attribute.ExecImmutable |\
260 "validation_function" : validation.is_enum,
261 "category" : AC.CATEGORY_DEPLOYMENT,
263 DC.DEPLOYMENT_COMMUNICATION : dict({
264 "name" : DC.DEPLOYMENT_COMMUNICATION,
265 "help" : "Instance communication mode",
266 "type" : Attribute.ENUM,
267 "value" : DC.ACCESS_LOCAL,
272 "flags" : Attribute.ExecReadOnly |\
273 Attribute.ExecImmutable |\
275 "validation_function" : validation.is_enum,
276 "category" : AC.CATEGORY_DEPLOYMENT,
278 DC.DEPLOYMENT_HOST : dict({
279 "name" : DC.DEPLOYMENT_HOST,
280 "help" : "Host where the testbed will be executed",
281 "type" : Attribute.STRING,
282 "value" : "localhost",
283 "flags" : Attribute.ExecReadOnly |\
284 Attribute.ExecImmutable |\
286 "validation_function" : validation.is_string,
287 "category" : AC.CATEGORY_DEPLOYMENT,
289 DC.DEPLOYMENT_USER : dict({
290 "name" : DC.DEPLOYMENT_USER,
291 "help" : "User on the Host to execute the testbed",
292 "type" : Attribute.STRING,
293 "value" : getpass.getuser(),
294 "flags" : Attribute.ExecReadOnly |\
295 Attribute.ExecImmutable |\
297 "validation_function" : validation.is_string,
298 "category" : AC.CATEGORY_DEPLOYMENT,
300 DC.DEPLOYMENT_KEY : dict({
301 "name" : DC.DEPLOYMENT_KEY,
302 "help" : "Path to SSH key to use for connecting",
303 "type" : Attribute.STRING,
304 "flags" : Attribute.ExecReadOnly |\
305 Attribute.ExecImmutable |\
307 "validation_function" : validation.is_string,
308 "category" : AC.CATEGORY_DEPLOYMENT,
310 DC.DEPLOYMENT_PORT : dict({
311 "name" : DC.DEPLOYMENT_PORT,
312 "help" : "Port on the Host",
313 "type" : Attribute.INTEGER,
315 "flags" : Attribute.ExecReadOnly |\
316 Attribute.ExecImmutable |\
318 "validation_function" : validation.is_integer,
319 "category" : AC.CATEGORY_DEPLOYMENT,
321 DC.ROOT_DIRECTORY : dict({
322 "name" : DC.ROOT_DIRECTORY,
323 "help" : "Root directory for storing process files",
324 "type" : Attribute.STRING,
326 "flags" : Attribute.ExecReadOnly |\
327 Attribute.ExecImmutable |\
329 "validation_function" : validation.is_string, # TODO: validation.is_path
330 "category" : AC.CATEGORY_DEPLOYMENT,
332 DC.USE_AGENT : dict({
333 "name" : DC.USE_AGENT,
334 "help" : "Use -A option for forwarding of the authentication agent, if ssh access is used",
335 "type" : Attribute.BOOL,
337 "flags" : Attribute.ExecReadOnly |\
338 Attribute.ExecImmutable |\
340 "validation_function" : validation.is_bool,
341 "category" : AC.CATEGORY_DEPLOYMENT,
344 "name" : DC.USE_SUDO,
345 "help" : "Use sudo to run the deamon process. This option only take flace when the server runs in daemon mode.",
346 "type" : Attribute.BOOL,
348 "flags" : Attribute.ExecReadOnly |\
349 Attribute.ExecImmutable |\
351 "validation_function" : validation.is_bool,
352 "category" : AC.CATEGORY_DEPLOYMENT,
354 DC.CLEAN_ROOT : dict({
355 "name" : DC.CLEAN_ROOT,
356 "help" : "Clean server root directory (Warning: This will erase previous data).",
357 "type" : Attribute.BOOL,
359 "flags" : Attribute.ExecReadOnly |\
360 Attribute.ExecImmutable |\
362 "validation_function" : validation.is_bool,
363 "category" : AC.CATEGORY_DEPLOYMENT,
365 DC.LOG_LEVEL : dict({
366 "name" : DC.LOG_LEVEL,
367 "help" : "Log level for instance",
368 "type" : Attribute.ENUM,
369 "value" : DC.ERROR_LEVEL,
374 "flags" : Attribute.ExecReadOnly |\
375 Attribute.ExecImmutable |\
377 "validation_function" : validation.is_enum,
378 "category" : AC.CATEGORY_DEPLOYMENT,
380 DC.RECOVERY_POLICY : dict({
381 "name" : DC.RECOVERY_POLICY,
382 "help" : "Specifies what action to take in the event of a failure.",
383 "type" : Attribute.ENUM,
384 "value" : DC.POLICY_FAIL,
390 "flags" : Attribute.ExecReadOnly |\
391 Attribute.ExecImmutable |\
393 "validation_function" : validation.is_enum,
394 "category" : AC.CATEGORY_DEPLOYMENT,
397 PROXY_ATTRIBUTES = dict({
400 "help" : "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
401 "type" : Attribute.BOOL,
403 "flags" : Attribute.ExecReadOnly |\
404 Attribute.ExecImmutable |\
406 "validation_function" : validation.is_bool,
407 "category" : AC.CATEGORY_DEPLOYMENT,
410 PROXY_ATTRIBUTES.update(DEPLOYMENT_ATTRIBUTES)
412 # These attributes could appear in the boxes attribute list
413 STANDARD_BOX_ATTRIBUTE_DEFINITIONS = dict({
415 "name" : "tun_proto",
416 "help" : "TUNneling protocol used",
417 "type" : Attribute.STRING,
418 "flags" : Attribute.DesignInvisible | \
419 Attribute.ExecInvisible | \
420 Attribute.ExecImmutable | \
422 "validation_function" : validation.is_string,
426 "help" : "Randomly selected TUNneling protocol cryptographic key. "
427 "Endpoints must agree to use the minimum (in lexicographic order) "
428 "of both the remote and local sides.",
429 "type" : Attribute.STRING,
430 "flags" : Attribute.DesignInvisible | \
431 Attribute.ExecInvisible | \
432 Attribute.ExecImmutable | \
434 "validation_function" : validation.is_string,
438 "help" : "Address (IP, unix socket, whatever) of the tunnel endpoint",
439 "type" : Attribute.STRING,
440 "flags" : Attribute.DesignInvisible | \
441 Attribute.ExecInvisible | \
442 Attribute.ExecImmutable | \
444 "validation_function" : validation.is_string,
448 "help" : "IP port of the tunnel endpoint",
449 "type" : Attribute.INTEGER,
450 "flags" : Attribute.DesignInvisible | \
451 Attribute.ExecInvisible | \
452 Attribute.ExecImmutable | \
454 "validation_function" : validation.is_integer,
456 "tun_cipher" : dict({
457 "name" : "tun_cipher",
458 "help" : "Cryptographic cipher used for tunnelling",
459 "type" : Attribute.ENUM,
468 "flags" : Attribute.ExecImmutable | \
470 "validation_function" : validation.is_enum,
472 ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP : dict({
473 "name" : ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,
474 "help" : "Commands to set up the environment needed to run NEPI testbeds",
475 "type" : Attribute.STRING,
476 "flags" : Attribute.DesignInvisible | \
477 Attribute.ExecInvisible | \
478 Attribute.ExecImmutable | \
480 "validation_function" : validation.is_string
484 STANDARD_TESTBED_ATTRIBUTES.update(DEPLOYMENT_ATTRIBUTES.copy())
486 def __init__(self, testbed_id):
487 self._testbed_id = testbed_id
488 metadata_module = self._load_metadata_module()
489 self._metadata = metadata_module.MetadataInfo()
490 if testbed_id != self._metadata.testbed_id:
491 raise RuntimeError("Bad testbed id. Asked for %s, got %s" % \
492 (testbed_id, self._metadata.testbed_id ))
495 def create_order(self):
496 return self._metadata.create_order
499 def configure_order(self):
500 return self._metadata.configure_order
503 def preconfigure_order(self):
504 return self._metadata.preconfigure_order
507 def prestart_order(self):
508 return self._metadata.prestart_order
511 def start_order(self):
512 return self._metadata.start_order
515 def testbed_version(self):
516 return self._metadata.testbed_version
519 def testbed_id(self):
520 return self._testbed_id
523 def supported_recovery_policies(self):
524 return self._metadata.supported_recovery_policies
526 def testbed_attributes(self):
527 attributes = AttributesMap()
528 testbed_attributes = self._testbed_attributes()
529 self._add_attributes(attributes.add_attribute, testbed_attributes)
532 def build_factories(self):
534 for factory_id, info in self._metadata.factories_info.iteritems():
535 create_function = info.get("create_function")
536 start_function = info.get("start_function")
537 stop_function = info.get("stop_function")
538 status_function = info.get("status_function")
539 configure_function = info.get("configure_function")
540 preconfigure_function = info.get("preconfigure_function")
541 prestart_function = info.get("prestart_function")
543 category = info["category"]
544 factory = Factory(factory_id,
550 preconfigure_function,
555 factory_attributes = self._factory_attributes(info)
556 self._add_attributes(factory.add_attribute, factory_attributes)
557 box_attributes = self._box_attributes(info)
558 self._add_attributes(factory.add_box_attribute, box_attributes)
560 self._add_traces(factory, info)
561 self._add_tags(factory, info)
562 self._add_connector_types(factory, info)
563 factories.append(factory)
566 def _load_metadata_module(self):
567 mod_name = nepi.util.environ.find_testbed(self._testbed_id) + ".metadata"
568 if not mod_name in sys.modules:
570 return sys.modules[mod_name]
572 def _testbed_attributes(self):
574 attributes = self.STANDARD_TESTBED_ATTRIBUTES.copy()
576 attributes.update(self._metadata.testbed_attributes.copy())
579 def _factory_attributes(self, info):
580 tagged_attributes = self._tagged_attributes(info)
581 if "factory_attributes" in info:
582 definitions = self._metadata.attributes.copy()
583 # filter attributes corresponding to the factory_id
584 factory_attributes = self._filter_attributes(info["factory_attributes"],
587 factory_attributes = dict()
588 attributes = dict(tagged_attributes.items() + \
589 factory_attributes.items())
592 def _box_attributes(self, info):
593 tagged_attributes = self._tagged_attributes(info)
594 if "box_attributes" in info:
595 definitions = self.STANDARD_BOX_ATTRIBUTE_DEFINITIONS.copy()
596 definitions.update(self._metadata.attributes)
597 box_attributes = self._filter_attributes(info["box_attributes"],
600 box_attributes = dict()
601 attributes = dict(tagged_attributes.items() + \
602 box_attributes.items())
603 attributes.update(self.STANDARD_BOX_ATTRIBUTES.copy())
606 def _tagged_attributes(self, info):
607 tagged_attributes = dict()
608 for tag_id in info.get("tags", []):
609 if tag_id in self.STANDARD_TAGGED_BOX_ATTRIBUTES:
610 attr_list = self.STANDARD_TAGGED_BOX_ATTRIBUTES[tag_id]
611 attributes = self._filter_attributes(attr_list,
612 self.STANDARD_TAGGED_ATTRIBUTES_DEFINITIONS)
613 tagged_attributes.update(attributes)
614 return tagged_attributes
616 def _filter_attributes(self, attr_list, definitions):
617 # filter attributes not corresponding to the factory
618 attributes = dict((attr_id, definitions[attr_id]) \
619 for attr_id in attr_list)
622 def _add_attributes(self, add_attr_func, attributes):
623 for attr_id, attr_info in attributes.iteritems():
624 name = attr_info["name"]
625 help = attr_info["help"]
626 type = attr_info["type"]
627 value = attr_info.get("value")
628 range = attr_info.get("range")
629 allowed = attr_info.get("allowed")
630 flags = attr_info.get("flags")
631 validation_function = attr_info["validation_function"]
632 category = attr_info.get("category")
633 add_attr_func(name, help, type, value, range, allowed, flags,
634 validation_function, category)
636 def _add_traces(self, factory, info):
637 for trace_id in info.get("traces", []):
638 trace_info = self._metadata.traces[trace_id]
639 name = trace_info["name"]
640 help = trace_info["help"]
641 factory.add_trace(name, help)
643 def _add_tags(self, factory, info):
644 for tag_id in info.get("tags", []):
645 factory.add_tag(tag_id)
647 def _add_connector_types(self, factory, info):
648 if "connector_types" in info:
649 from_connections = dict()
650 to_connections = dict()
651 for connection in self._metadata.connections:
652 froms = connection["from"]
653 tos = connection["to"]
654 can_cross = connection["can_cross"]
655 init_code = connection.get("init_code")
656 compl_code = connection.get("compl_code")
658 for from_ in _expand(froms):
659 for to in _expand(tos):
660 if from_ not in from_connections:
661 from_connections[from_] = list()
662 if to not in to_connections:
663 to_connections[to] = list()
664 from_connections[from_].append((to, can_cross, init_code,
666 to_connections[to].append((from_, can_cross, init_code,
668 for connector_id in info["connector_types"]:
669 connector_type_info = self._metadata.connector_types[
671 name = connector_type_info["name"]
672 help = connector_type_info["help"]
673 max = connector_type_info["max"]
674 min = connector_type_info["min"]
675 testbed_id = self._testbed_id
676 factory_id = factory.factory_id
677 connector_type = ConnectorType(testbed_id, factory_id, name,
679 connector_key = (testbed_id, factory_id, name)
680 if connector_key in to_connections:
681 for (from_, can_cross, init_code, compl_code) in \
682 to_connections[connector_key]:
683 (testbed_id_from, factory_id_from, name_from) = from_
684 connector_type.add_from_connection(testbed_id_from,
685 factory_id_from, name_from, can_cross,
686 init_code, compl_code)
687 if connector_key in from_connections:
688 for (to, can_cross, init_code, compl_code) in \
689 from_connections[(testbed_id, factory_id, name)]:
690 (testbed_id_to, factory_id_to, name_to) = to
691 connector_type.add_to_connection(testbed_id_to,
692 factory_id_to, name_to, can_cross, init_code,
694 factory.add_connector_type(connector_type)
699 Expands multiple values in the "val" tuple to create cross products:
701 >>> list(_expand((1,2,3)))
703 >>> list(_expand((1,(2,4,5),3)))
704 [(1, 2, 3), (1, 4, 3), (1, 5, 3)]
705 >>> list(_expand(((1,2),(2,4,5),3)))
706 [(1, 2, 3), (1, 4, 3), (1, 5, 3), (2, 2, 3), (2, 4, 3), (2, 5, 3)]
710 elif isinstance(val[0], (list,set,tuple)):
713 for e_val in _expand(val[1:]):
717 for e_val in _expand(val[1:]):