2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
20 class TestbedController(object):
21 def __init__(self, testbed_id, testbed_version):
22 self._testbed_id = testbed_id
23 self._testbed_version = testbed_version
27 return self._testbed_id
30 def testbed_version(self):
31 return self._testbed_version
35 raise NotImplementedError
37 def defer_configure(self, name, value):
38 """Instructs setting a configuartion attribute for the testbed instance"""
39 raise NotImplementedError
41 def defer_create(self, guid, factory_id):
42 """Instructs creation of element """
43 raise NotImplementedError
45 def defer_create_set(self, guid, name, value):
46 """Instructs setting an initial attribute on an element"""
47 raise NotImplementedError
49 def defer_factory_set(self, guid, name, value):
50 """Instructs setting an attribute on a factory"""
51 raise NotImplementedError
53 def defer_connect(self, guid1, connector_type_name1, guid2,
54 connector_type_name2):
55 """Instructs creation of a connection between the given connectors"""
56 raise NotImplementedError
58 def defer_cross_connect(self,
59 guid, connector_type_name,
60 cross_guid, cross_testbed_guid,
61 cross_testbed_id, cross_factory_id,
62 cross_connector_type_name):
64 Instructs creation of a connection between the given connectors
65 of different testbed instances
67 raise NotImplementedError
69 def defer_add_trace(self, guid, trace_id):
70 """Instructs the addition of a trace"""
71 raise NotImplementedError
73 def defer_add_address(self, guid, address, netprefix, broadcast):
74 """Instructs the addition of an address"""
75 raise NotImplementedError
77 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
78 """Instructs the addition of a route"""
79 raise NotImplementedError
82 """After do_setup the testbed initial configuration is done"""
83 raise NotImplementedError
87 After do_create all instructed elements are created and
90 raise NotImplementedError
92 def do_connect_init(self):
94 After do_connect_init all internal connections between testbed elements
97 raise NotImplementedError
99 def do_connect_compl(self):
101 After do_connect all internal connections between testbed elements
104 raise NotImplementedError
106 def do_preconfigure(self):
108 Done just before resolving netrefs, after connection, before cross connections,
109 useful for early stages of configuration, for setting up stuff that might be
110 required for netref resolution.
112 raise NotImplementedError
114 def do_configure(self):
115 """After do_configure elements are configured"""
116 raise NotImplementedError
118 def do_prestart(self):
119 """Before do_start elements are prestart-configured"""
120 raise NotImplementedError
122 def do_cross_connect_init(self, cross_data):
124 After do_cross_connect_init initiation of all external connections
125 between different testbed elements is performed
127 raise NotImplementedError
129 def do_cross_connect_compl(self, cross_data):
131 After do_cross_connect_compl completion of all external connections
132 between different testbed elements is performed
134 raise NotImplementedError
137 raise NotImplementedError
140 raise NotImplementedError
142 def set(self, guid, name, value, time = TIME_NOW):
143 raise NotImplementedError
145 def get(self, guid, name, time = TIME_NOW):
146 raise NotImplementedError
148 def get_route(self, guid, index, attribute):
152 guid: guid of box to query
153 index: number of routing entry to fetch
154 attribute: one of Destination, NextHop, NetPrefix
156 raise NotImplementedError
158 def get_address(self, guid, index, attribute='Address'):
162 guid: guid of box to query
163 index: number of inteface to select
164 attribute: one of Address, NetPrefix, Broadcast
166 raise NotImplementedError
168 def get_attribute_list(self, guid, filter_flags = None):
169 raise NotImplementedError
171 def get_factory_id(self, guid):
172 raise NotImplementedError
174 def action(self, time, guid, action):
175 raise NotImplementedError
177 def status(self, guid):
178 raise NotImplementedError
180 def trace(self, guid, trace_id, attribute='value'):
181 raise NotImplementedError
183 def traces_info(self):
184 """ dictionary of dictionaries:
190 filesize = size in bytes,
194 raise NotImplementedError
197 raise NotImplementedError
199 class ExperimentController(object):
200 def __init__(self, experiment_xml, root_dir):
201 self._experiment_xml = experiment_xml
202 self._testbeds = dict()
203 self._deployment_config = dict()
204 self._netrefs = collections.defaultdict(set)
205 self._testbed_netrefs = collections.defaultdict(set)
206 self._cross_data = dict()
207 self._root_dir = root_dir
208 self._netreffed_testbeds = set()
209 self._guids_in_testbed_cache = dict()
211 self.persist_experiment_xml()
214 def experiment_xml(self):
215 return self._experiment_xml
220 for testbed_guid in self._testbeds.keys():
221 _guids = self._guids_in_testbed(testbed_guid)
226 def persist_experiment_xml(self):
227 xml_path = os.path.join(self._root_dir, "experiment.xml")
228 f = open(xml_path, "w")
229 f.write(self._experiment_xml)
232 def trace(self, guid, trace_id, attribute='value'):
233 testbed = self._testbed_for_guid(guid)
235 return testbed.trace(guid, trace_id, attribute)
236 raise RuntimeError("No element exists with guid %d" % guid)
238 def traces_info(self):
240 for guid, testbed in self._testbeds.iteritems():
241 tinfo = testbed.traces_info()
243 traces_info[guid] = testbed.traces_info()
247 def _parallel(callables):
250 @functools.wraps(callable)
251 def wrapped(*p, **kw):
256 traceback.print_exc(file=sys.stderr)
257 excs.append(sys.exc_info())
259 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
260 for thread in threads:
262 for thread in threads:
265 eTyp, eVal, eLoc = exc
266 raise eTyp, eVal, eLoc
269 parser = XmlExperimentParser()
270 data = parser.from_xml_to_data(self._experiment_xml)
271 self._init_testbed_controllers(data)
273 # persist testbed connection data, for potential recovery
274 self._persist_testbed_proxies()
276 def steps_to_configure(self, allowed_guids):
277 # perform setup in parallel for all test beds,
278 # wait for all threads to finish
279 self._parallel([testbed.do_setup
280 for guid,testbed in self._testbeds.iteritems()
281 if guid in allowed_guids])
283 # perform create-connect in parallel, wait
284 # (internal connections only)
285 self._parallel([testbed.do_create
286 for guid,testbed in self._testbeds.iteritems()
287 if guid in allowed_guids])
289 self._parallel([testbed.do_connect_init
290 for guid,testbed in self._testbeds.iteritems()
291 if guid in allowed_guids])
293 self._parallel([testbed.do_connect_compl
294 for guid,testbed in self._testbeds.iteritems()
295 if guid in allowed_guids])
297 self._parallel([testbed.do_preconfigure
298 for guid,testbed in self._testbeds.iteritems()
299 if guid in allowed_guids])
302 steps_to_configure(self, self._testbeds)
304 if self._netreffed_testbeds:
305 # initally resolve netrefs
306 self.do_netrefs(data, fail_if_undefined=False)
308 # rinse and repeat, for netreffed testbeds
309 netreffed_testbeds = set(self._netreffed_testbeds)
311 self._init_testbed_controllers(data)
313 # persist testbed connection data, for potential recovery
314 self._persist_testbed_proxies()
316 # configure dependant testbeds
317 steps_to_configure(self, netreffed_testbeds)
319 # final netref step, fail if anything's left unresolved
320 self.do_netrefs(data, fail_if_undefined=True)
322 self._program_testbed_cross_connections(data)
324 # perform do_configure in parallel for al testbeds
325 # (it's internal configuration for each)
326 self._parallel([testbed.do_configure
327 for testbed in self._testbeds.itervalues()])
331 #print >>sys.stderr, "DO IT"
335 # cross-connect (cannot be done in parallel)
336 for guid, testbed in self._testbeds.iteritems():
337 cross_data = self._get_cross_data(guid)
338 testbed.do_cross_connect_init(cross_data)
339 for guid, testbed in self._testbeds.iteritems():
340 cross_data = self._get_cross_data(guid)
341 testbed.do_cross_connect_compl(cross_data)
345 # Last chance to configure (parallel on all testbeds)
346 self._parallel([testbed.do_prestart
347 for testbed in self._testbeds.itervalues()])
351 # start experiment (parallel start on all testbeds)
352 self._parallel([testbed.start
353 for testbed in self._testbeds.itervalues()])
357 def _clear_caches(self):
358 # Cleaning cache for safety.
359 self._guids_in_testbed_cache = dict()
361 def _persist_testbed_proxies(self):
362 TRANSIENT = ('Recover',)
364 # persist access configuration for all testbeds, so that
365 # recovery mode can reconnect to them if it becomes necessary
366 conf = ConfigParser.RawConfigParser()
367 for testbed_guid, testbed_config in self._deployment_config.iteritems():
368 testbed_guid = str(testbed_guid)
369 conf.add_section(testbed_guid)
370 for attr in testbed_config.get_attribute_list():
371 if attr not in TRANSIENT:
372 conf.set(testbed_guid, attr,
373 testbed_config.get_attribute_value(attr))
375 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
379 def _load_testbed_proxies(self):
384 BOOLEAN : 'getboolean',
387 # deferred import because proxy needs
388 # our class definitions to define proxies
389 import nepi.util.proxy as proxy
391 conf = ConfigParser.RawConfigParser()
392 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
393 for testbed_guid in conf.sections():
394 testbed_config = proxy.AccessConfiguration()
395 for attr in conf.options(testbed_guid):
396 testbed_config.set_attribute_value(attr,
397 conf.get(testbed_guid, attr) )
399 testbed_guid = str(testbed_guid)
400 conf.add_section(testbed_guid)
401 for attr in testbed_config.get_attribute_list():
402 if attr not in TRANSIENT:
403 getter = getattr(conf, TYPEMAP.get(
404 testbed_config.get_attribute_type(attr),
406 testbed_config.set_attribute_value(
407 testbed_guid, attr, getter(attr))
409 def _unpersist_testbed_proxies(self):
411 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
413 # Just print exceptions, this is just cleanup
415 ######## BUG ##########
416 #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
417 #traceback.print_exc(file=sys.stderr)
420 for testbed in self._testbeds.values():
422 self._unpersist_testbed_proxies()
425 # reload perviously persisted testbed access configurations
426 self._load_testbed_proxies()
428 # recreate testbed proxies by reconnecting only
429 self._init_testbed_controllers(recover = True)
431 # another time, for netrefs
432 self._init_testbed_controllers(recover = True)
434 def is_finished(self, guid):
435 testbed = self._testbed_for_guid(guid)
437 return testbed.status(guid) == AS.STATUS_FINISHED
438 raise RuntimeError("No element exists with guid %d" % guid)
440 def set(self, guid, name, value, time = TIME_NOW):
441 testbed = self._testbed_for_guid(guid)
443 testbed.set(guid, name, value, time)
445 raise RuntimeError("No element exists with guid %d" % guid)
447 def get(self, guid, name, time = TIME_NOW):
448 testbed = self._testbed_for_guid(guid)
450 return testbed.get(guid, name, time)
451 raise RuntimeError("No element exists with guid %d" % guid)
453 def get_deferred(self, guid, name, time = TIME_NOW):
454 testbed = self._testbed_for_guid(guid)
456 return testbed.get_deferred(guid, name, time)
457 raise RuntimeError("No element exists with guid %d" % guid)
459 def get_factory_id(self, guid):
460 testbed = self._testbed_for_guid(guid)
462 return testbed.get_factory_id(guid)
463 raise RuntimeError("No element exists with guid %d" % guid)
465 def get_testbed_id(self, guid):
466 testbed = self._testbed_for_guid(guid)
468 return testbed.testbed_id
469 raise RuntimeError("No element exists with guid %d" % guid)
471 def get_testbed_version(self, guid):
472 testbed = self._testbed_for_guid(guid)
474 return testbed.testbed_version
475 raise RuntimeError("No element exists with guid %d" % guid)
479 for testbed in self._testbeds.values():
483 exceptions.append(sys.exc_info())
484 for exc_info in exceptions:
485 raise exc_info[0], exc_info[1], exc_info[2]
487 def _testbed_for_guid(self, guid):
488 for testbed_guid in self._testbeds.keys():
489 if guid in self._guids_in_testbed(testbed_guid):
490 return self._testbeds[testbed_guid]
493 def _guids_in_testbed(self, testbed_guid):
494 if testbed_guid not in self._testbeds:
496 if testbed_guid not in self._guids_in_testbed_cache:
497 self._guids_in_testbed_cache[testbed_guid] = \
498 set(self._testbeds[testbed_guid].guids)
499 return self._guids_in_testbed_cache[testbed_guid]
502 def _netref_component_split(component):
503 match = COMPONENT_PATTERN.match(component)
505 return match.group("kind"), match.group("index")
507 return component, None
509 _NETREF_COMPONENT_GETTERS = {
511 lambda testbed, guid, index, name:
512 testbed.get_address(guid, int(index), name),
514 lambda testbed, guid, index, name:
515 testbed.get_route(guid, int(index), name),
517 lambda testbed, guid, index, name:
518 testbed.trace(guid, index, name),
520 lambda testbed, guid, index, name:
521 testbed.get(guid, name),
524 def resolve_netref_value(self, value, failval = None):
525 match = ATTRIBUTE_PATTERN_BASE.search(value)
527 label = match.group("label")
528 if label.startswith('GUID-'):
529 ref_guid = int(label[5:])
531 expr = match.group("expr")
532 component = (match.group("component") or "")[1:] # skip the dot
533 attribute = match.group("attribute")
535 # split compound components into component kind and index
536 # eg: 'addr[0]' -> ('addr', '0')
537 component, component_index = self._netref_component_split(component)
539 # find object and resolve expression
540 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
541 if component not in self._NETREF_COMPONENT_GETTERS:
542 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
543 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
546 ref_value = self._NETREF_COMPONENT_GETTERS[component](
547 ref_testbed, ref_guid, component_index, attribute)
549 return value.replace(match.group(), ref_value)
550 # couldn't find value
553 def do_netrefs(self, data, fail_if_undefined = False):
555 for (testbed_guid, guid), attrs in self._netrefs.items():
556 testbed = self._testbeds.get(testbed_guid)
557 if testbed is not None:
558 for name in set(attrs):
559 value = testbed.get(guid, name)
560 if isinstance(value, basestring):
561 ref_value = self.resolve_netref_value(value)
562 if ref_value is not None:
563 testbed.set(guid, name, ref_value)
565 elif fail_if_undefined:
566 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
568 del self._netrefs[(testbed_guid, guid)]
571 for testbed_guid, attrs in self._testbed_netrefs.items():
572 tb_data = dict(data.get_attribute_data(testbed_guid))
574 for name in set(attrs):
575 value = tb_data.get(name)
576 if isinstance(value, basestring):
577 ref_value = self.resolve_netref_value(value)
578 if ref_value is not None:
579 data.set_attribute_data(testbed_guid, name, ref_value)
581 elif fail_if_undefined:
582 raise ValueError, "Unresolvable netref in: %r" % (value,)
584 del self._testbed_netrefs[testbed_guid]
587 def _init_testbed_controllers(self, data, recover = False):
588 blacklist_testbeds = set(self._testbeds)
589 element_guids = list()
591 data_guids = data.guids
593 # create testbed controllers
594 for guid in data_guids:
595 if data.is_testbed_data(guid):
596 if guid not in self._testbeds:
597 self._create_testbed_controller(guid, data, element_guids,
600 (testbed_guid, factory_id) = data.get_box_data(guid)
601 if testbed_guid not in blacklist_testbeds:
602 element_guids.append(guid)
603 label = data.get_attribute_data(guid, "label")
604 if label is not None:
605 if label in label_guids:
606 raise RuntimeError, "Label %r is not unique" % (label,)
607 label_guids[label] = guid
609 # replace references to elements labels for its guid
610 self._resolve_labels(data, data_guids, label_guids)
612 # program testbed controllers
614 self._program_testbed_controllers(element_guids, data)
616 def _resolve_labels(self, data, data_guids, label_guids):
617 netrefs = self._netrefs
618 testbed_netrefs = self._testbed_netrefs
619 for guid in data_guids:
620 for name, value in data.get_attribute_data(guid):
621 if isinstance(value, basestring):
622 match = ATTRIBUTE_PATTERN_BASE.search(value)
624 label = match.group("label")
625 if not label.startswith('GUID-'):
626 ref_guid = label_guids.get(label)
627 if ref_guid is not None:
628 value = ATTRIBUTE_PATTERN_BASE.sub(
629 ATTRIBUTE_PATTERN_GUID_SUB % dict(
630 guid = 'GUID-%d' % (ref_guid,),
631 expr = match.group("expr"),
634 data.set_attribute_data(guid, name, value)
636 # memorize which guid-attribute pairs require
637 # postprocessing, to avoid excessive controller-testbed
638 # communication at configuration time
639 # (which could require high-latency network I/O)
640 if not data.is_testbed_data(guid):
641 (testbed_guid, factory_id) = data.get_box_data(guid)
642 netrefs[(testbed_guid, guid)].add(name)
644 testbed_netrefs[guid].add(name)
646 def _create_testbed_controller(self, guid, data, element_guids, recover):
647 (testbed_id, testbed_version) = data.get_testbed_data(guid)
648 deployment_config = self._deployment_config.get(guid)
650 # deferred import because proxy needs
651 # our class definitions to define proxies
652 import nepi.util.proxy as proxy
654 if deployment_config is None:
656 deployment_config = proxy.AccessConfiguration()
658 for (name, value) in data.get_attribute_data(guid):
659 if value is not None and deployment_config.has_attribute(name):
660 # if any deployment config attribute has a netref, we can't
661 # create this controller yet
662 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
663 # remember to re-issue this one
664 self._netreffed_testbeds.add(guid)
667 # copy deployment config attribute
668 deployment_config.set_attribute_value(name, value)
671 self._deployment_config[guid] = deployment_config
673 if deployment_config is not None:
674 # force recovery mode
675 deployment_config.set_attribute_value("recover",recover)
677 testbed = proxy.create_testbed_controller(testbed_id,
678 testbed_version, deployment_config)
679 for (name, value) in data.get_attribute_data(guid):
680 testbed.defer_configure(name, value)
681 self._testbeds[guid] = testbed
682 if guid in self._netreffed_testbeds:
683 self._netreffed_testbeds.remove(guid)
685 def _program_testbed_controllers(self, element_guids, data):
686 for guid in element_guids:
687 (testbed_guid, factory_id) = data.get_box_data(guid)
688 testbed = self._testbeds.get(testbed_guid)
690 testbed.defer_create(guid, factory_id)
691 for (name, value) in data.get_attribute_data(guid):
692 # Try to resolve create-time netrefs, if possible
693 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
695 nuvalue = self.resolve_netref_value(value)
697 # Any trouble means we're not in shape to resolve the netref yet
699 if nuvalue is not None:
700 # Only if we succeed we remove the netref deferral entry
702 data.set_attribute_data(guid, name, value)
703 if (testbed_guid, guid) in self._netrefs:
704 self._netrefs[(testbed_guid, guid)].discard(name)
705 testbed.defer_create_set(guid, name, value)
707 for guid in element_guids:
708 (testbed_guid, factory_id) = data.get_box_data(guid)
709 testbed = self._testbeds.get(testbed_guid)
711 for (connector_type_name, cross_guid, cross_connector_type_name) \
712 in data.get_connection_data(guid):
713 (testbed_guid, factory_id) = data.get_box_data(guid)
714 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
716 if testbed_guid == cross_testbed_guid:
717 testbed.defer_connect(guid, connector_type_name,
718 cross_guid, cross_connector_type_name)
719 for trace_id in data.get_trace_data(guid):
720 testbed.defer_add_trace(guid, trace_id)
721 for (address, netprefix, broadcast) in \
722 data.get_address_data(guid):
724 testbed.defer_add_address(guid, address, netprefix,
726 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
727 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
729 def _program_testbed_cross_connections(self, data):
730 data_guids = data.guids
732 for guid in data_guids:
733 if not data.is_testbed_data(guid):
734 (testbed_guid, factory_id) = data.get_box_data(guid)
735 testbed = self._testbeds.get(testbed_guid)
737 for (connector_type_name, cross_guid, cross_connector_type_name) \
738 in data.get_connection_data(guid):
739 (testbed_guid, factory_id) = data.get_box_data(guid)
740 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
742 if testbed_guid != cross_testbed_guid:
743 cross_testbed = self._testbeds[cross_testbed_guid]
744 cross_testbed_id = cross_testbed.testbed_id
745 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
746 cross_testbed_guid, cross_testbed_id, cross_factory_id,
747 cross_connector_type_name)
748 # save cross data for later
749 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
752 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
753 if testbed_guid not in self._cross_data:
754 self._cross_data[testbed_guid] = dict()
755 if cross_testbed_guid not in self._cross_data[testbed_guid]:
756 self._cross_data[testbed_guid][cross_testbed_guid] = set()
757 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
759 def _get_cross_data(self, testbed_guid):
761 if not testbed_guid in self._cross_data:
763 for cross_testbed_guid, guid_list in \
764 self._cross_data[testbed_guid].iteritems():
765 cross_data[cross_testbed_guid] = dict()
766 cross_testbed = self._testbeds[cross_testbed_guid]
767 for cross_guid in guid_list:
768 elem_cross_data = dict(
770 _testbed_guid = cross_testbed_guid,
771 _testbed_id = cross_testbed.testbed_id,
772 _testbed_version = cross_testbed.testbed_version)
773 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
774 attribute_list = cross_testbed.get_attribute_list(cross_guid,
775 filter_flags = Attribute.DesignOnly)
776 for attr_name in attribute_list:
777 attr_value = cross_testbed.get(cross_guid, attr_name)
778 elem_cross_data[attr_name] = attr_value