2 # -*- coding: utf-8 -*-
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
20 class TestbedController(object):
21 def __init__(self, testbed_id, testbed_version):
22 self._testbed_id = testbed_id
23 self._testbed_version = testbed_version
27 return self._testbed_id
30 def testbed_version(self):
31 return self._testbed_version
35 raise NotImplementedError
37 def defer_configure(self, name, value):
38 """Instructs setting a configuartion attribute for the testbed instance"""
39 raise NotImplementedError
41 def defer_create(self, guid, factory_id):
42 """Instructs creation of element """
43 raise NotImplementedError
45 def defer_create_set(self, guid, name, value):
46 """Instructs setting an initial attribute on an element"""
47 raise NotImplementedError
49 def defer_factory_set(self, guid, name, value):
50 """Instructs setting an attribute on a factory"""
51 raise NotImplementedError
53 def defer_connect(self, guid1, connector_type_name1, guid2,
54 connector_type_name2):
55 """Instructs creation of a connection between the given connectors"""
56 raise NotImplementedError
58 def defer_cross_connect(self,
59 guid, connector_type_name,
60 cross_guid, cross_testbed_guid,
61 cross_testbed_id, cross_factory_id,
62 cross_connector_type_name):
64 Instructs creation of a connection between the given connectors
65 of different testbed instances
67 raise NotImplementedError
69 def defer_add_trace(self, guid, trace_id):
70 """Instructs the addition of a trace"""
71 raise NotImplementedError
73 def defer_add_address(self, guid, address, netprefix, broadcast):
74 """Instructs the addition of an address"""
75 raise NotImplementedError
77 def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
78 """Instructs the addition of a route"""
79 raise NotImplementedError
82 """After do_setup the testbed initial configuration is done"""
83 raise NotImplementedError
87 After do_create all instructed elements are created and
90 raise NotImplementedError
92 def do_connect_init(self):
94 After do_connect_init all internal connections between testbed elements
97 raise NotImplementedError
99 def do_connect_compl(self):
101 After do_connect all internal connections between testbed elements
104 raise NotImplementedError
106 def do_preconfigure(self):
108 Done just before resolving netrefs, after connection, before cross connections,
109 useful for early stages of configuration, for setting up stuff that might be
110 required for netref resolution.
112 raise NotImplementedError
114 def do_configure(self):
115 """After do_configure elements are configured"""
116 raise NotImplementedError
118 def do_prestart(self):
119 """Before do_start elements are prestart-configured"""
120 raise NotImplementedError
122 def do_cross_connect_init(self, cross_data):
124 After do_cross_connect_init initiation of all external connections
125 between different testbed elements is performed
127 raise NotImplementedError
129 def do_cross_connect_compl(self, cross_data):
131 After do_cross_connect_compl completion of all external connections
132 between different testbed elements is performed
134 raise NotImplementedError
137 raise NotImplementedError
140 raise NotImplementedError
142 def set(self, guid, name, value, time = TIME_NOW):
143 raise NotImplementedError
145 def get(self, guid, name, time = TIME_NOW):
146 raise NotImplementedError
148 def get_route(self, guid, index, attribute):
152 guid: guid of box to query
153 index: number of routing entry to fetch
154 attribute: one of Destination, NextHop, NetPrefix
156 raise NotImplementedError
158 def get_address(self, guid, index, attribute='Address'):
162 guid: guid of box to query
163 index: number of inteface to select
164 attribute: one of Address, NetPrefix, Broadcast
166 raise NotImplementedError
168 def get_attribute_list(self, guid, filter_flags = None):
169 raise NotImplementedError
171 def get_factory_id(self, guid):
172 raise NotImplementedError
174 def action(self, time, guid, action):
175 raise NotImplementedError
177 def status(self, guid):
178 raise NotImplementedError
180 def trace(self, guid, trace_id, attribute='value'):
181 raise NotImplementedError
183 def traces_info(self):
184 """ dictionary of dictionaries:
190 filesize = size in bytes,
194 raise NotImplementedError
197 raise NotImplementedError
199 class ExperimentController(object):
200 def __init__(self, experiment_xml, root_dir):
201 self._experiment_xml = experiment_xml
202 self._testbeds = dict()
203 self._deployment_config = dict()
204 self._netrefs = collections.defaultdict(set)
205 self._testbed_netrefs = collections.defaultdict(set)
206 self._cross_data = dict()
207 self._root_dir = root_dir
208 self._netreffed_testbeds = set()
209 self._guids_in_testbed_cache = dict()
211 self.persist_experiment_xml()
214 def experiment_xml(self):
215 return self._experiment_xml
220 for testbed_guid in self._testbeds.keys():
221 _guids = self._guids_in_testbed(testbed_guid)
226 def persist_experiment_xml(self):
227 xml_path = os.path.join(self._root_dir, "experiment.xml")
228 f = open(xml_path, "w")
229 f.write(self._experiment_xml)
232 def trace(self, guid, trace_id, attribute='value'):
233 testbed = self._testbed_for_guid(guid)
235 return testbed.trace(guid, trace_id, attribute)
236 raise RuntimeError("No element exists with guid %d" % guid)
238 def traces_info(self):
240 for guid, testbed in self._testbeds.iteritems():
241 tinfo = testbed.traces_info()
243 traces_info[guid] = testbed.traces_info()
247 def _parallel(callables):
250 @functools.wraps(callable)
251 def wrapped(*p, **kw):
256 traceback.print_exc(file=sys.stderr)
257 excs.append(sys.exc_info())
259 threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
260 for thread in threads:
262 for thread in threads:
265 eTyp, eVal, eLoc = exc
266 raise eTyp, eVal, eLoc
269 parser = XmlExperimentParser()
270 data = parser.from_xml_to_data(self._experiment_xml)
272 self._init_testbed_controllers(data)
274 # persist testbed connection data, for potential recovery
275 self._persist_testbed_proxies()
277 def steps_to_configure(self, allowed_guids):
278 # perform setup in parallel for all test beds,
279 # wait for all threads to finish
280 self._parallel([testbed.do_setup
281 for guid,testbed in self._testbeds.iteritems()
282 if guid in allowed_guids])
284 # perform create-connect in parallel, wait
285 # (internal connections only)
286 self._parallel([testbed.do_create
287 for guid,testbed in self._testbeds.iteritems()
288 if guid in allowed_guids])
290 self._parallel([testbed.do_connect_init
291 for guid,testbed in self._testbeds.iteritems()
292 if guid in allowed_guids])
294 self._parallel([testbed.do_connect_compl
295 for guid,testbed in self._testbeds.iteritems()
296 if guid in allowed_guids])
298 self._parallel([testbed.do_preconfigure
299 for guid,testbed in self._testbeds.iteritems()
300 if guid in allowed_guids])
303 steps_to_configure(self, self._testbeds)
305 if self._netreffed_testbeds:
306 # initally resolve netrefs
307 self.do_netrefs(data, fail_if_undefined=False)
309 # rinse and repeat, for netreffed testbeds
310 netreffed_testbeds = set(self._netreffed_testbeds)
312 self._init_testbed_controllers(data)
314 # persist testbed connection data, for potential recovery
315 self._persist_testbed_proxies()
317 # configure dependant testbeds
318 steps_to_configure(self, netreffed_testbeds)
320 # final netref step, fail if anything's left unresolved
321 self.do_netrefs(data, fail_if_undefined=True)
323 self._program_testbed_cross_connections(data)
325 # perform do_configure in parallel for al testbeds
326 # (it's internal configuration for each)
327 self._parallel([testbed.do_configure
328 for testbed in self._testbeds.itervalues()])
332 #print >>sys.stderr, "DO IT"
336 # cross-connect (cannot be done in parallel)
337 for guid, testbed in self._testbeds.iteritems():
338 cross_data = self._get_cross_data(guid)
339 testbed.do_cross_connect_init(cross_data)
340 for guid, testbed in self._testbeds.iteritems():
341 cross_data = self._get_cross_data(guid)
342 testbed.do_cross_connect_compl(cross_data)
346 # Last chance to configure (parallel on all testbeds)
347 self._parallel([testbed.do_prestart
348 for testbed in self._testbeds.itervalues()])
352 # start experiment (parallel start on all testbeds)
353 self._parallel([testbed.start
354 for testbed in self._testbeds.itervalues()])
358 def _clear_caches(self):
359 # Cleaning cache for safety.
360 self._guids_in_testbed_cache = dict()
362 def _persist_testbed_proxies(self):
363 TRANSIENT = ('Recover',)
365 # persist access configuration for all testbeds, so that
366 # recovery mode can reconnect to them if it becomes necessary
367 conf = ConfigParser.RawConfigParser()
368 for testbed_guid, testbed_config in self._deployment_config.iteritems():
369 testbed_guid = str(testbed_guid)
370 conf.add_section(testbed_guid)
371 for attr in testbed_config.get_attribute_list():
372 if attr not in TRANSIENT:
373 conf.set(testbed_guid, attr,
374 testbed_config.get_attribute_value(attr))
376 f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
380 def _load_testbed_proxies(self):
385 BOOLEAN : 'getboolean',
388 # deferred import because proxy needs
389 # our class definitions to define proxies
390 import nepi.util.proxy as proxy
392 conf = ConfigParser.RawConfigParser()
393 conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
394 for testbed_guid in conf.sections():
395 testbed_config = proxy.AccessConfiguration()
396 for attr in conf.options(testbed_guid):
397 testbed_config.set_attribute_value(attr,
398 conf.get(testbed_guid, attr) )
400 testbed_guid = str(testbed_guid)
401 conf.add_section(testbed_guid)
402 for attr in testbed_config.get_attribute_list():
403 if attr not in TRANSIENT:
404 getter = getattr(conf, TYPEMAP.get(
405 testbed_config.get_attribute_type(attr),
407 testbed_config.set_attribute_value(
408 testbed_guid, attr, getter(attr))
410 def _unpersist_testbed_proxies(self):
412 os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
414 # Just print exceptions, this is just cleanup
416 ######## BUG ##########
417 #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
418 #traceback.print_exc(file=sys.stderr)
421 for testbed in self._testbeds.values():
423 self._unpersist_testbed_proxies()
426 # reload perviously persisted testbed access configurations
427 self._load_testbed_proxies()
429 # recreate testbed proxies by reconnecting only
430 self._init_testbed_controllers(recover = True)
432 # another time, for netrefs
433 self._init_testbed_controllers(recover = True)
435 def is_finished(self, guid):
436 testbed = self._testbed_for_guid(guid)
438 return testbed.status(guid) == AS.STATUS_FINISHED
439 raise RuntimeError("No element exists with guid %d" % guid)
441 def set(self, guid, name, value, time = TIME_NOW):
442 testbed = self._testbed_for_guid(guid)
444 testbed.set(guid, name, value, time)
446 raise RuntimeError("No element exists with guid %d" % guid)
448 def get(self, guid, name, time = TIME_NOW):
449 testbed = self._testbed_for_guid(guid)
451 return testbed.get(guid, name, time)
452 raise RuntimeError("No element exists with guid %d" % guid)
454 def get_deferred(self, guid, name, time = TIME_NOW):
455 testbed = self._testbed_for_guid(guid)
457 return testbed.get_deferred(guid, name, time)
458 raise RuntimeError("No element exists with guid %d" % guid)
460 def get_factory_id(self, guid):
461 testbed = self._testbed_for_guid(guid)
463 return testbed.get_factory_id(guid)
464 raise RuntimeError("No element exists with guid %d" % guid)
466 def get_testbed_id(self, guid):
467 testbed = self._testbed_for_guid(guid)
469 return testbed.testbed_id
470 raise RuntimeError("No element exists with guid %d" % guid)
472 def get_testbed_version(self, guid):
473 testbed = self._testbed_for_guid(guid)
475 return testbed.testbed_version
476 raise RuntimeError("No element exists with guid %d" % guid)
480 for testbed in self._testbeds.values():
484 exceptions.append(sys.exc_info())
485 for exc_info in exceptions:
486 raise exc_info[0], exc_info[1], exc_info[2]
488 def _testbed_for_guid(self, guid):
489 for testbed_guid in self._testbeds.keys():
490 if guid in self._guids_in_testbed(testbed_guid):
491 return self._testbeds[testbed_guid]
494 def _guids_in_testbed(self, testbed_guid):
495 if testbed_guid not in self._testbeds:
497 if testbed_guid not in self._guids_in_testbed_cache:
498 self._guids_in_testbed_cache[testbed_guid] = \
499 set(self._testbeds[testbed_guid].guids)
500 return self._guids_in_testbed_cache[testbed_guid]
503 def _netref_component_split(component):
504 match = COMPONENT_PATTERN.match(component)
506 return match.group("kind"), match.group("index")
508 return component, None
510 _NETREF_COMPONENT_GETTERS = {
512 lambda testbed, guid, index, name:
513 testbed.get_address(guid, int(index), name),
515 lambda testbed, guid, index, name:
516 testbed.get_route(guid, int(index), name),
518 lambda testbed, guid, index, name:
519 testbed.trace(guid, index, name),
521 lambda testbed, guid, index, name:
522 testbed.get(guid, name),
525 def resolve_netref_value(self, value, failval = None):
526 match = ATTRIBUTE_PATTERN_BASE.search(value)
528 label = match.group("label")
529 if label.startswith('GUID-'):
530 ref_guid = int(label[5:])
532 expr = match.group("expr")
533 component = (match.group("component") or "")[1:] # skip the dot
534 attribute = match.group("attribute")
536 # split compound components into component kind and index
537 # eg: 'addr[0]' -> ('addr', '0')
538 component, component_index = self._netref_component_split(component)
540 # find object and resolve expression
541 for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
542 if component not in self._NETREF_COMPONENT_GETTERS:
543 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
544 elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
547 ref_value = self._NETREF_COMPONENT_GETTERS[component](
548 ref_testbed, ref_guid, component_index, attribute)
550 return value.replace(match.group(), ref_value)
551 # couldn't find value
554 def do_netrefs(self, data, fail_if_undefined = False):
556 for (testbed_guid, guid), attrs in self._netrefs.items():
557 testbed = self._testbeds.get(testbed_guid)
558 if testbed is not None:
559 for name in set(attrs):
560 value = testbed.get(guid, name)
561 if isinstance(value, basestring):
562 ref_value = self.resolve_netref_value(value)
563 if ref_value is not None:
564 testbed.set(guid, name, ref_value)
566 elif fail_if_undefined:
567 raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
569 del self._netrefs[(testbed_guid, guid)]
572 for testbed_guid, attrs in self._testbed_netrefs.items():
573 tb_data = dict(data.get_attribute_data(testbed_guid))
575 for name in set(attrs):
576 value = tb_data.get(name)
577 if isinstance(value, basestring):
578 ref_value = self.resolve_netref_value(value)
579 if ref_value is not None:
580 data.set_attribute_data(testbed_guid, name, ref_value)
582 elif fail_if_undefined:
583 raise ValueError, "Unresolvable netref in: %r" % (value,)
585 del self._testbed_netrefs[testbed_guid]
588 def _init_testbed_controllers(self, data, recover = False):
589 blacklist_testbeds = set(self._testbeds)
590 element_guids = list()
592 data_guids = data.guids
594 # create testbed controllers
595 for guid in data_guids:
596 if data.is_testbed_data(guid):
597 if guid not in self._testbeds:
598 self._create_testbed_controller(guid, data, element_guids,
601 (testbed_guid, factory_id) = data.get_box_data(guid)
602 if testbed_guid not in blacklist_testbeds:
603 element_guids.append(guid)
604 label = data.get_attribute_data(guid, "label")
605 if label is not None:
606 if label in label_guids:
607 raise RuntimeError, "Label %r is not unique" % (label,)
608 label_guids[label] = guid
610 # replace references to elements labels for its guid
611 self._resolve_labels(data, data_guids, label_guids)
613 # program testbed controllers
615 self._program_testbed_controllers(element_guids, data)
617 def _resolve_labels(self, data, data_guids, label_guids):
618 netrefs = self._netrefs
619 testbed_netrefs = self._testbed_netrefs
620 for guid in data_guids:
621 for name, value in data.get_attribute_data(guid):
622 if isinstance(value, basestring):
623 match = ATTRIBUTE_PATTERN_BASE.search(value)
625 label = match.group("label")
626 if not label.startswith('GUID-'):
627 ref_guid = label_guids.get(label)
628 if ref_guid is not None:
629 value = ATTRIBUTE_PATTERN_BASE.sub(
630 ATTRIBUTE_PATTERN_GUID_SUB % dict(
631 guid = 'GUID-%d' % (ref_guid,),
632 expr = match.group("expr"),
635 data.set_attribute_data(guid, name, value)
637 # memorize which guid-attribute pairs require
638 # postprocessing, to avoid excessive controller-testbed
639 # communication at configuration time
640 # (which could require high-latency network I/O)
641 if not data.is_testbed_data(guid):
642 (testbed_guid, factory_id) = data.get_box_data(guid)
643 netrefs[(testbed_guid, guid)].add(name)
645 testbed_netrefs[guid].add(name)
647 def _create_testbed_controller(self, guid, data, element_guids, recover):
648 (testbed_id, testbed_version) = data.get_testbed_data(guid)
649 deployment_config = self._deployment_config.get(guid)
651 # deferred import because proxy needs
652 # our class definitions to define proxies
653 import nepi.util.proxy as proxy
655 if deployment_config is None:
657 deployment_config = proxy.AccessConfiguration()
659 for (name, value) in data.get_attribute_data(guid):
660 if value is not None and deployment_config.has_attribute(name):
661 # if any deployment config attribute has a netref, we can't
662 # create this controller yet
663 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
664 # remember to re-issue this one
665 self._netreffed_testbeds.add(guid)
668 # copy deployment config attribute
669 deployment_config.set_attribute_value(name, value)
672 self._deployment_config[guid] = deployment_config
674 if deployment_config is not None:
675 # force recovery mode
676 deployment_config.set_attribute_value("recover",recover)
678 testbed = proxy.create_testbed_controller(testbed_id,
679 testbed_version, deployment_config)
680 for (name, value) in data.get_attribute_data(guid):
681 testbed.defer_configure(name, value)
682 self._testbeds[guid] = testbed
683 if guid in self._netreffed_testbeds:
684 self._netreffed_testbeds.remove(guid)
686 def _program_testbed_controllers(self, element_guids, data):
687 for guid in element_guids:
688 (testbed_guid, factory_id) = data.get_box_data(guid)
689 testbed = self._testbeds.get(testbed_guid)
691 testbed.defer_create(guid, factory_id)
692 for (name, value) in data.get_attribute_data(guid):
693 # Try to resolve create-time netrefs, if possible
694 if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
696 nuvalue = self.resolve_netref_value(value)
698 # Any trouble means we're not in shape to resolve the netref yet
700 if nuvalue is not None:
701 # Only if we succeed we remove the netref deferral entry
703 data.set_attribute_data(guid, name, value)
704 if (testbed_guid, guid) in self._netrefs:
705 self._netrefs[(testbed_guid, guid)].discard(name)
706 testbed.defer_create_set(guid, name, value)
708 for guid in element_guids:
709 (testbed_guid, factory_id) = data.get_box_data(guid)
710 testbed = self._testbeds.get(testbed_guid)
712 for (connector_type_name, cross_guid, cross_connector_type_name) \
713 in data.get_connection_data(guid):
714 (testbed_guid, factory_id) = data.get_box_data(guid)
715 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
717 if testbed_guid == cross_testbed_guid:
718 testbed.defer_connect(guid, connector_type_name,
719 cross_guid, cross_connector_type_name)
720 for trace_id in data.get_trace_data(guid):
721 testbed.defer_add_trace(guid, trace_id)
722 for (address, netprefix, broadcast) in \
723 data.get_address_data(guid):
725 testbed.defer_add_address(guid, address, netprefix,
727 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
728 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
730 def _program_testbed_cross_connections(self, data):
731 data_guids = data.guids
733 for guid in data_guids:
734 if not data.is_testbed_data(guid):
735 (testbed_guid, factory_id) = data.get_box_data(guid)
736 testbed = self._testbeds.get(testbed_guid)
738 for (connector_type_name, cross_guid, cross_connector_type_name) \
739 in data.get_connection_data(guid):
740 (testbed_guid, factory_id) = data.get_box_data(guid)
741 (cross_testbed_guid, cross_factory_id) = data.get_box_data(
743 if testbed_guid != cross_testbed_guid:
744 cross_testbed = self._testbeds[cross_testbed_guid]
745 cross_testbed_id = cross_testbed.testbed_id
746 testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
747 cross_testbed_guid, cross_testbed_id, cross_factory_id,
748 cross_connector_type_name)
749 # save cross data for later
750 self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
753 def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
754 if testbed_guid not in self._cross_data:
755 self._cross_data[testbed_guid] = dict()
756 if cross_testbed_guid not in self._cross_data[testbed_guid]:
757 self._cross_data[testbed_guid][cross_testbed_guid] = set()
758 self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
760 def _get_cross_data(self, testbed_guid):
762 if not testbed_guid in self._cross_data:
764 for cross_testbed_guid, guid_list in \
765 self._cross_data[testbed_guid].iteritems():
766 cross_data[cross_testbed_guid] = dict()
767 cross_testbed = self._testbeds[cross_testbed_guid]
768 for cross_guid in guid_list:
769 elem_cross_data = dict(
771 _testbed_guid = cross_testbed_guid,
772 _testbed_id = cross_testbed.testbed_id,
773 _testbed_version = cross_testbed.testbed_version)
774 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
775 attribute_list = cross_testbed.get_attribute_list(cross_guid,
776 filter_flags = Attribute.DesignOnly)
777 for attr_name in attribute_list:
778 attr_value = cross_testbed.get(cross_guid, attr_name)
779 elem_cross_data[attr_name] = attr_value