2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.core.connector import ConnectorType
6 from nepi.core.factory import Factory
9 import nepi.util.environ
10 from nepi.util import tags, validation
11 from nepi.util.constants import ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP, \
12 DeploymentConfiguration as DC, \
13 AttributeCategories as AC
15 class Parallel(object):
16 def __init__(self, factory, maxthreads = 64):
17 self.factory = factory
18 self.maxthreads = maxthreads
20 class MetadataInfo(object):
22 def connector_types(self):
23 """ dictionary of dictionaries with allowed connection information.
26 "name": connector type name,
27 "max": maximum number of connections allowed (-1 for no limit),
28 "min": minimum number of connections allowed
31 raise NotImplementedError
34 def connections(self):
35 """ array of dictionaries with allowed connection information.
37 "from": (testbed_id1, factory_id1, connector_type_name1),
38 "to": (testbed_id2, factory_id2, connector_type_name2),
39 "init_code": connection function to invoke for connection initiation
40 "compl_code": connection function to invoke for connection
42 "can_cross": whether the connection can be done across testbed
46 raise NotImplementedError
50 """ dictionary of dictionaries of all available attributes.
52 "name": attribute name,
54 "type": attribute type,
55 "value": default attribute value,
56 "range": (maximum, minimun) values else None if not defined,
57 "allowed": array of posible values,
58 "flags": attributes flags,
59 "validation_function": validation function for the attribute
60 "category": category for the attribute
63 raise NotImplementedError
67 """ dictionary of dictionaries of all available traces.
73 raise NotImplementedError
76 def create_order(self):
77 """ list of factory ids that indicates the order in which the elements
78 should be instantiated. If wrapped within a Parallel instance, they
79 will be instantiated in parallel.
81 raise NotImplementedError
84 def configure_order(self):
85 """ list of factory ids that indicates the order in which the elements
86 should be configured. If wrapped within a Parallel instance, they
87 will be configured in parallel.
89 raise NotImplementedError
92 def preconfigure_order(self):
93 """ list of factory ids that indicates the order in which the elements
94 should be preconfigured. If wrapped within a Parallel instance, they
95 will be configured in parallel.
97 Default: same as configure_order
99 return self.configure_order
102 def prestart_order(self):
103 """ list of factory ids that indicates the order in which the elements
104 should be prestart-configured. If wrapped within a Parallel instance, they
105 will be configured in parallel.
107 Default: same as configure_order
109 return self.configure_order
112 def start_order(self):
113 """ list of factory ids that indicates the order in which the elements
114 should be started. If wrapped within a Parallel instance, they
115 will be started in parallel.
117 Default: same as configure_order
119 return self.configure_order
122 def factories_info(self):
123 """ dictionary of dictionaries of factory specific information
126 "category": category the element belongs to,
127 "create_function": function for element instantiation,
128 "start_function": function for element starting,
129 "stop_function": function for element stoping,
130 "status_function": function for retrieving element status,
131 "preconfigure_function": function for element preconfiguration,
132 (just after connections are made,
133 just before netrefs are resolved)
134 "configure_function": function for element configuration,
135 "prestart_function": function for pre-start
136 element configuration (just before starting applications),
137 useful for synchronization of background setup tasks or
138 lazy instantiation or configuration of attributes
139 that require connection/cross-connection state before
141 After this point, all applications should be able to run.
142 "factory_attributes": list of references to attribute_ids,
143 "box_attributes": list of regerences to attribute_ids,
144 "traces": list of references to trace_id
145 "tags": list of references to tag_id
146 "connector_types": list of references to connector_types
149 raise NotImplementedError
152 def testbed_attributes(self):
153 """ dictionary of attributes for testbed instance configuration
154 attributes_id = dict({
155 "name": attribute name,
157 "type": attribute type,
158 "value": default attribute value,
159 "range": (maximum, minimun) values else None if not defined,
160 "allowed": array of posible values,
161 "flags": attributes flags,
162 "validation_function": validation function for the attribute
163 "category": category for the attribute
167 raise NotImplementedError
170 def testbed_id(self):
171 """ ID for the testbed """
172 raise NotImplementedError
175 def testbed_version(self):
176 """ version for the testbed """
177 raise NotImplementedError
179 class Metadata(object):
180 # These attributes should be added to all boxes
181 STANDARD_BOX_ATTRIBUTES = dict({
184 "validation_function" : validation.is_string,
185 "type" : Attribute.STRING,
186 "flags" : Attribute.ExecReadOnly |\
187 Attribute.ExecImmutable |\
189 "help" : "A unique identifier for referring to this box",
193 # These are the attribute definitions for tagged attributes
194 STANDARD_TAGGED_ATTRIBUTES_DEFINITIONS = dict({
195 "maxAddresses" : dict({
196 "name" : "maxAddresses",
197 "validation_function" : validation.is_integer,
198 "type" : Attribute.INTEGER,
200 "flags" : Attribute.DesignReadOnly |\
201 Attribute.ExecInvisible |\
203 "help" : "The maximum allowed number of addresses",
207 # Attributes to be added to all boxes with specific tags
208 STANDARD_TAGGED_BOX_ATTRIBUTES = dict({
209 tags.ALLOW_ADDRESSES : ["maxAddresses"],
210 tags.HAS_ADDRESSES : ["maxAddresses"],
213 # These attributes should be added to all testbeds
214 STANDARD_TESTBED_ATTRIBUTES = dict({
215 "home_directory" : dict({
216 "name" : "homeDirectory",
217 "validation_function" : validation.is_string,
218 "help" : "Path to the directory where traces and other files will be stored",
219 "type" : Attribute.STRING,
221 "flags" : Attribute.ExecReadOnly |\
222 Attribute.ExecImmutable |\
227 "validation_function" : validation.is_string,
228 "type" : Attribute.STRING,
229 "flags" : Attribute.ExecReadOnly |\
230 Attribute.ExecImmutable |\
232 "help" : "A unique identifier for referring to this testbed",
236 # These attributes should be added to all testbeds
237 DEPLOYMENT_ATTRIBUTES = dict({
238 # TESTBED DEPLOYMENT ATTRIBUTES
239 DC.DEPLOYMENT_ENVIRONMENT_SETUP : dict({
240 "name" : DC.DEPLOYMENT_ENVIRONMENT_SETUP,
241 "validation_function" : validation.is_string,
242 "help" : "Shell commands to run before spawning TestbedController processes",
243 "type" : Attribute.STRING,
244 "flags" : Attribute.ExecReadOnly |\
245 Attribute.ExecImmutable |\
247 "category" : AC.CATEGORY_DEPLOYMENT,
249 DC.DEPLOYMENT_MODE: dict({
250 "name" : DC.DEPLOYMENT_MODE,
251 "help" : "Instance execution mode",
252 "type" : Attribute.ENUM,
253 "value" : DC.MODE_SINGLE_PROCESS,
256 DC.MODE_SINGLE_PROCESS
258 "flags" : Attribute.ExecReadOnly |\
259 Attribute.ExecImmutable |\
261 "validation_function" : validation.is_enum,
262 "category" : AC.CATEGORY_DEPLOYMENT,
264 DC.DEPLOYMENT_COMMUNICATION : dict({
265 "name" : DC.DEPLOYMENT_COMMUNICATION,
266 "help" : "Instance communication mode",
267 "type" : Attribute.ENUM,
268 "value" : DC.ACCESS_LOCAL,
273 "flags" : Attribute.ExecReadOnly |\
274 Attribute.ExecImmutable |\
276 "validation_function" : validation.is_enum,
277 "category" : AC.CATEGORY_DEPLOYMENT,
279 DC.DEPLOYMENT_HOST : dict({
280 "name" : DC.DEPLOYMENT_HOST,
281 "help" : "Host where the testbed will be executed",
282 "type" : Attribute.STRING,
283 "value" : "localhost",
284 "flags" : Attribute.ExecReadOnly |\
285 Attribute.ExecImmutable |\
287 "validation_function" : validation.is_string,
288 "category" : AC.CATEGORY_DEPLOYMENT,
290 DC.DEPLOYMENT_USER : dict({
291 "name" : DC.DEPLOYMENT_USER,
292 "help" : "User on the Host to execute the testbed",
293 "type" : Attribute.STRING,
294 "value" : getpass.getuser(),
295 "flags" : Attribute.ExecReadOnly |\
296 Attribute.ExecImmutable |\
298 "validation_function" : validation.is_string,
299 "category" : AC.CATEGORY_DEPLOYMENT,
301 DC.DEPLOYMENT_KEY : dict({
302 "name" : DC.DEPLOYMENT_KEY,
303 "help" : "Path to SSH key to use for connecting",
304 "type" : Attribute.STRING,
305 "flags" : Attribute.ExecReadOnly |\
306 Attribute.ExecImmutable |\
308 "validation_function" : validation.is_string,
309 "category" : AC.CATEGORY_DEPLOYMENT,
311 DC.DEPLOYMENT_PORT : dict({
312 "name" : DC.DEPLOYMENT_PORT,
313 "help" : "Port on the Host",
314 "type" : Attribute.INTEGER,
316 "flags" : Attribute.ExecReadOnly |\
317 Attribute.ExecImmutable |\
319 "validation_function" : validation.is_integer,
320 "category" : AC.CATEGORY_DEPLOYMENT,
322 DC.ROOT_DIRECTORY : dict({
323 "name" : DC.ROOT_DIRECTORY,
324 "help" : "Root directory for storing process files",
325 "type" : Attribute.STRING,
327 "flags" : Attribute.ExecReadOnly |\
328 Attribute.ExecImmutable |\
330 "validation_function" : validation.is_string, # TODO: validation.is_path
331 "category" : AC.CATEGORY_DEPLOYMENT,
333 DC.USE_AGENT : dict({
334 "name" : DC.USE_AGENT,
335 "help" : "Use -A option for forwarding of the authentication agent, if ssh access is used",
336 "type" : Attribute.BOOL,
338 "flags" : Attribute.ExecReadOnly |\
339 Attribute.ExecImmutable |\
341 "validation_function" : validation.is_bool,
342 "category" : AC.CATEGORY_DEPLOYMENT,
345 "name" : DC.USE_SUDO,
346 "help" : "Use sudo to run the deamon process. This option only take flace when the server runs in daemon mode.",
347 "type" : Attribute.BOOL,
349 "flags" : Attribute.ExecReadOnly |\
350 Attribute.ExecImmutable |\
352 "validation_function" : validation.is_bool,
353 "category" : AC.CATEGORY_DEPLOYMENT,
355 DC.CLEAN_ROOT : dict({
356 "name" : DC.CLEAN_ROOT,
357 "help" : "Clean server root directory (Warning: This will erase previous data).",
358 "type" : Attribute.BOOL,
360 "flags" : Attribute.ExecReadOnly |\
361 Attribute.ExecImmutable |\
363 "validation_function" : validation.is_bool,
364 "category" : AC.CATEGORY_DEPLOYMENT,
366 DC.LOG_LEVEL : dict({
367 "name" : DC.LOG_LEVEL,
368 "help" : "Log level for instance",
369 "type" : Attribute.ENUM,
370 "value" : DC.ERROR_LEVEL,
375 "flags" : Attribute.ExecReadOnly |\
376 Attribute.ExecImmutable |\
378 "validation_function" : validation.is_enum,
379 "category" : AC.CATEGORY_DEPLOYMENT,
381 DC.RECOVERY_POLICY : dict({
382 "name" : DC.RECOVERY_POLICY,
383 "help" : "Specifies what action to take in the event of a failure.",
384 "type" : Attribute.ENUM,
385 "value" : DC.POLICY_FAIL,
391 "flags" : Attribute.ExecReadOnly |\
392 Attribute.ExecImmutable |\
394 "validation_function" : validation.is_enum,
395 "category" : AC.CATEGORY_DEPLOYMENT,
398 PROXY_ATTRIBUTES = dict({
401 "help" : "Do not intantiate testbeds, rather, reconnect to already-running instances. Used to recover from a dead controller.",
402 "type" : Attribute.BOOL,
404 "flags" : Attribute.ExecReadOnly |\
405 Attribute.ExecImmutable |\
407 "validation_function" : validation.is_bool,
408 "category" : AC.CATEGORY_DEPLOYMENT,
411 PROXY_ATTRIBUTES.update(DEPLOYMENT_ATTRIBUTES)
413 # These attributes could appear in the boxes attribute list
414 STANDARD_BOX_ATTRIBUTE_DEFINITIONS = dict({
416 "name" : "tun_proto",
417 "help" : "TUNneling protocol used",
418 "type" : Attribute.STRING,
419 "flags" : Attribute.DesignInvisible | \
420 Attribute.ExecInvisible | \
421 Attribute.ExecImmutable | \
423 "validation_function" : validation.is_string,
427 "help" : "Randomly selected TUNneling protocol cryptographic key. "
428 "Endpoints must agree to use the minimum (in lexicographic order) "
429 "of both the remote and local sides.",
430 "type" : Attribute.STRING,
431 "flags" : Attribute.DesignInvisible | \
432 Attribute.ExecInvisible | \
433 Attribute.ExecImmutable | \
435 "validation_function" : validation.is_string,
439 "help" : "Address (IP, unix socket, whatever) of the tunnel endpoint",
440 "type" : Attribute.STRING,
441 "flags" : Attribute.DesignInvisible | \
442 Attribute.ExecInvisible | \
443 Attribute.ExecImmutable | \
445 "validation_function" : validation.is_string,
449 "help" : "IP port of the tunnel endpoint",
450 "type" : Attribute.INTEGER,
451 "flags" : Attribute.DesignInvisible | \
452 Attribute.ExecInvisible | \
453 Attribute.ExecImmutable | \
455 "validation_function" : validation.is_integer,
457 "tun_cipher" : dict({
458 "name" : "tun_cipher",
459 "help" : "Cryptographic cipher used for tunnelling",
460 "type" : Attribute.ENUM,
469 "flags" : Attribute.ExecImmutable,
470 "validation_function" : validation.is_enum,
472 ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP : dict({
473 "name" : ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,
474 "help" : "Commands to set up the environment needed to run NEPI testbeds",
475 "type" : Attribute.STRING,
476 "flags" : Attribute.DesignInvisible | \
477 Attribute.ExecInvisible | \
478 Attribute.ExecImmutable | \
480 "validation_function" : validation.is_string
484 STANDARD_TESTBED_ATTRIBUTES.update(DEPLOYMENT_ATTRIBUTES.copy())
486 def __init__(self, testbed_id):
487 self._testbed_id = testbed_id
488 metadata_module = self._load_metadata_module()
489 self._metadata = metadata_module.MetadataInfo()
490 if testbed_id != self._metadata.testbed_id:
491 raise RuntimeError("Bad testbed id. Asked for %s, got %s" % \
492 (testbed_id, self._metadata.testbed_id ))
495 def create_order(self):
496 return self._metadata.create_order
499 def configure_order(self):
500 return self._metadata.configure_order
503 def preconfigure_order(self):
504 return self._metadata.preconfigure_order
507 def prestart_order(self):
508 return self._metadata.prestart_order
511 def start_order(self):
512 return self._metadata.start_order
515 def testbed_version(self):
516 return self._metadata.testbed_version
519 def testbed_id(self):
520 return self._testbed_id
523 def supported_recovery_policies(self):
524 return self._metadata.supported_recovery_policies
526 def testbed_attributes(self):
527 attributes = AttributesMap()
528 testbed_attributes = self._testbed_attributes()
529 self._add_attributes(attributes.add_attribute, testbed_attributes)
532 def build_factories(self):
534 for factory_id, info in self._metadata.factories_info.iteritems():
535 create_function = info.get("create_function")
536 start_function = info.get("start_function")
537 stop_function = info.get("stop_function")
538 status_function = info.get("status_function")
539 configure_function = info.get("configure_function")
540 preconfigure_function = info.get("preconfigure_function")
541 prestart_function = info.get("prestart_function")
543 category = info["category"]
544 factory = Factory(factory_id,
550 preconfigure_function,
555 factory_attributes = self._factory_attributes(info)
556 self._add_attributes(factory.add_attribute, factory_attributes)
557 box_attributes = self._box_attributes(info)
558 self._add_attributes(factory.add_box_attribute, box_attributes)
560 self._add_traces(factory, info)
561 self._add_tags(factory, info)
562 self._add_connector_types(factory, info)
563 factories.append(factory)
566 def _load_metadata_module(self):
567 mod_name = nepi.util.environ.find_testbed(self._testbed_id) + ".metadata"
568 if not mod_name in sys.modules:
570 return sys.modules[mod_name]
572 def _testbed_attributes(self):
574 attributes = self.STANDARD_TESTBED_ATTRIBUTES.copy()
576 attributes.update(self._metadata.testbed_attributes.copy())
579 def _factory_attributes(self, info):
580 tagged_attributes = self._tagged_attributes(info)
581 if "factory_attributes" in info:
582 definitions = self._metadata.attributes.copy()
583 # filter attributes corresponding to the factory_id
584 factory_attributes = self._filter_attributes(info["factory_attributes"],
587 factory_attributes = dict()
588 attributes = dict(tagged_attributes.items() + \
589 factory_attributes.items())
592 def _box_attributes(self, info):
593 tagged_attributes = self._tagged_attributes(info)
594 if "box_attributes" in info:
595 definitions = self.STANDARD_BOX_ATTRIBUTE_DEFINITIONS.copy()
596 definitions.update(self._metadata.attributes)
597 box_attributes = self._filter_attributes(info["box_attributes"],
600 box_attributes = dict()
601 attributes = dict(tagged_attributes.items() + \
602 box_attributes.items())
603 attributes.update(self.STANDARD_BOX_ATTRIBUTES.copy())
606 def _tagged_attributes(self, info):
607 tagged_attributes = dict()
608 for tag_id in info.get("tags", []):
609 if tag_id in self.STANDARD_TAGGED_BOX_ATTRIBUTES:
610 attr_list = self.STANDARD_TAGGED_BOX_ATTRIBUTES[tag_id]
611 attributes = self._filter_attributes(attr_list,
612 self.STANDARD_TAGGED_ATTRIBUTES_DEFINITIONS)
613 tagged_attributes.update(attributes)
614 return tagged_attributes
616 def _filter_attributes(self, attr_list, definitions):
617 # filter attributes not corresponding to the factory
618 attributes = dict((attr_id, definitions[attr_id]) \
619 for attr_id in attr_list)
622 def _add_attributes(self, add_attr_func, attributes):
623 for attr_id, attr_info in attributes.iteritems():
624 name = attr_info["name"]
625 help = attr_info["help"]
626 type = attr_info["type"]
627 value = attr_info.get("value")
628 range = attr_info.get("range")
629 allowed = attr_info.get("allowed")
630 flags = attr_info.get("flags")
631 validation_function = attr_info["validation_function"]
632 category = attr_info.get("category")
633 add_attr_func(name, help, type, value, range, allowed, flags,
634 validation_function, category)
636 def _add_traces(self, factory, info):
637 for trace_id in info.get("traces", []):
638 trace_info = self._metadata.traces[trace_id]
639 name = trace_info["name"]
640 help = trace_info["help"]
641 factory.add_trace(name, help)
643 def _add_tags(self, factory, info):
644 for tag_id in info.get("tags", []):
645 factory.add_tag(tag_id)
647 def _add_connector_types(self, factory, info):
648 if "connector_types" in info:
649 from_connections = dict()
650 to_connections = dict()
651 for connection in self._metadata.connections:
652 froms = connection["from"]
653 tos = connection["to"]
654 can_cross = connection["can_cross"]
655 init_code = connection.get("init_code")
656 compl_code = connection.get("compl_code")
658 for from_ in _expand(froms):
659 for to in _expand(tos):
660 if from_ not in from_connections:
661 from_connections[from_] = list()
662 if to not in to_connections:
663 to_connections[to] = list()
664 from_connections[from_].append((to, can_cross, init_code,
666 to_connections[to].append((from_, can_cross, init_code,
668 for connector_id in info["connector_types"]:
669 connector_type_info = self._metadata.connector_types[
671 name = connector_type_info["name"]
672 help = connector_type_info["help"]
673 max = connector_type_info["max"]
674 min = connector_type_info["min"]
675 testbed_id = self._testbed_id
676 factory_id = factory.factory_id
677 connector_type = ConnectorType(testbed_id, factory_id, name,
679 connector_key = (testbed_id, factory_id, name)
680 if connector_key in to_connections:
681 for (from_, can_cross, init_code, compl_code) in \
682 to_connections[connector_key]:
683 (testbed_id_from, factory_id_from, name_from) = from_
684 connector_type.add_from_connection(testbed_id_from,
685 factory_id_from, name_from, can_cross,
686 init_code, compl_code)
687 if connector_key in from_connections:
688 for (to, can_cross, init_code, compl_code) in \
689 from_connections[(testbed_id, factory_id, name)]:
690 (testbed_id_to, factory_id_to, name_to) = to
691 connector_type.add_to_connection(testbed_id_to,
692 factory_id_to, name_to, can_cross, init_code,
694 factory.add_connector_type(connector_type)
699 Expands multiple values in the "val" tuple to create cross products:
701 >>> list(_expand((1,2,3)))
703 >>> list(_expand((1,(2,4,5),3)))
704 [(1, 2, 3), (1, 4, 3), (1, 5, 3)]
705 >>> list(_expand(((1,2),(2,4,5),3)))
706 [(1, 2, 3), (1, 4, 3), (1, 5, 3), (2, 2, 3), (2, 4, 3), (2, 5, 3)]
710 elif isinstance(val[0], (list,set,tuple)):
713 for e_val in _expand(val[1:]):
717 for e_val in _expand(val[1:]):