Removed version number from testbed file names. Only 1 version permitted at the time.
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
19
20 class TestbedController(object):
21     def __init__(self, testbed_id, testbed_version):
22         self._testbed_id = testbed_id
23         self._testbed_version = testbed_version
24
25     @property
26     def testbed_id(self):
27         return self._testbed_id
28
29     @property
30     def testbed_version(self):
31         return self._testbed_version
32
33     @property
34     def guids(self):
35         raise NotImplementedError
36
37     def defer_configure(self, name, value):
38         """Instructs setting a configuartion attribute for the testbed instance"""
39         raise NotImplementedError
40
41     def defer_create(self, guid, factory_id):
42         """Instructs creation of element """
43         raise NotImplementedError
44
45     def defer_create_set(self, guid, name, value):
46         """Instructs setting an initial attribute on an element"""
47         raise NotImplementedError
48
49     def defer_factory_set(self, guid, name, value):
50         """Instructs setting an attribute on a factory"""
51         raise NotImplementedError
52
53     def defer_connect(self, guid1, connector_type_name1, guid2, 
54             connector_type_name2): 
55         """Instructs creation of a connection between the given connectors"""
56         raise NotImplementedError
57
58     def defer_cross_connect(self, 
59             guid, connector_type_name,
60             cross_guid, cross_testbed_guid,
61             cross_testbed_id, cross_factory_id,
62             cross_connector_type_name):
63         """
64         Instructs creation of a connection between the given connectors 
65         of different testbed instances
66         """
67         raise NotImplementedError
68
69     def defer_add_trace(self, guid, trace_id):
70         """Instructs the addition of a trace"""
71         raise NotImplementedError
72
73     def defer_add_address(self, guid, address, netprefix, broadcast): 
74         """Instructs the addition of an address"""
75         raise NotImplementedError
76
77     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
78         """Instructs the addition of a route"""
79         raise NotImplementedError
80
81     def do_setup(self):
82         """After do_setup the testbed initial configuration is done"""
83         raise NotImplementedError
84
85     def do_create(self):
86         """
87         After do_create all instructed elements are created and 
88         attributes setted
89         """
90         raise NotImplementedError
91
92     def do_connect_init(self):
93         """
94         After do_connect_init all internal connections between testbed elements
95         are initiated
96         """
97         raise NotImplementedError
98
99     def do_connect_compl(self):
100         """
101         After do_connect all internal connections between testbed elements
102         are completed
103         """
104         raise NotImplementedError
105
106     def do_preconfigure(self):
107         """
108         Done just before resolving netrefs, after connection, before cross connections,
109         useful for early stages of configuration, for setting up stuff that might be
110         required for netref resolution.
111         """
112         raise NotImplementedError
113
114     def do_configure(self):
115         """After do_configure elements are configured"""
116         raise NotImplementedError
117
118     def do_prestart(self):
119         """Before do_start elements are prestart-configured"""
120         raise NotImplementedError
121
122     def do_cross_connect_init(self, cross_data):
123         """
124         After do_cross_connect_init initiation of all external connections 
125         between different testbed elements is performed
126         """
127         raise NotImplementedError
128
129     def do_cross_connect_compl(self, cross_data):
130         """
131         After do_cross_connect_compl completion of all external connections 
132         between different testbed elements is performed
133         """
134         raise NotImplementedError
135
136     def start(self):
137         raise NotImplementedError
138
139     def stop(self):
140         raise NotImplementedError
141
142     def set(self, guid, name, value, time = TIME_NOW):
143         raise NotImplementedError
144
145     def get(self, guid, name, time = TIME_NOW):
146         raise NotImplementedError
147     
148     def get_route(self, guid, index, attribute):
149         """
150         Params:
151             
152             guid: guid of box to query
153             index: number of routing entry to fetch
154             attribute: one of Destination, NextHop, NetPrefix
155         """
156         raise NotImplementedError
157
158     def get_address(self, guid, index, attribute='Address'):
159         """
160         Params:
161             
162             guid: guid of box to query
163             index: number of inteface to select
164             attribute: one of Address, NetPrefix, Broadcast
165         """
166         raise NotImplementedError
167
168     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
169         raise NotImplementedError
170
171     def get_factory_id(self, guid):
172         raise NotImplementedError
173
174     def action(self, time, guid, action):
175         raise NotImplementedError
176
177     def status(self, guid):
178         raise NotImplementedError
179
180     def trace(self, guid, trace_id, attribute='value'):
181         raise NotImplementedError
182
183     def traces_info(self):
184         """ dictionary of dictionaries:
185             traces_info = dict({
186                 guid = dict({
187                     trace_id = dict({
188                             host = host,
189                             filepath = filepath,
190                             filesize = size in bytes,
191                         })
192                 })
193             })"""
194         raise NotImplementedError
195
196     def shutdown(self):
197         raise NotImplementedError
198
199 class ExperimentController(object):
200     def __init__(self, experiment_xml, root_dir):
201         self._experiment_design_xml = experiment_xml
202         self._experiment_execute_xml = None
203         self._testbeds = dict()
204         self._deployment_config = dict()
205         self._netrefs = collections.defaultdict(set)
206         self._testbed_netrefs = collections.defaultdict(set)
207         self._cross_data = dict()
208         self._root_dir = root_dir
209         self._netreffed_testbeds = set()
210         self._guids_in_testbed_cache = dict()
211
212         self.persist_experiment_xml()
213
214     @property
215     def experiment_design_xml(self):
216         return self._experiment_design_xml
217
218     @property
219     def experiment_execute_xml(self):
220         return self._experiment_execute_xml
221
222     @property
223     def guids(self):
224         guids = list()
225         for testbed_guid in self._testbeds.keys():
226             _guids = self._guids_in_testbed(testbed_guid)
227             if _guids:
228                 guids.extend(_guids)
229         return guids
230
231     def persist_experiment_xml(self):
232         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
233         f = open(xml_path, "w")
234         f.write(self._experiment_design_xml)
235         f.close()
236
237     def persist_execute_xml(self):
238         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
239         f = open(xml_path, "w")
240         f.write(self._experiment_execute_xml)
241         f.close()
242
243     def trace(self, guid, trace_id, attribute='value'):
244         testbed = self._testbed_for_guid(guid)
245         if testbed != None:
246             return testbed.trace(guid, trace_id, attribute)
247         raise RuntimeError("No element exists with guid %d" % guid)    
248
249     def traces_info(self):
250         traces_info = dict()
251         for guid, testbed in self._testbeds.iteritems():
252             tinfo = testbed.traces_info()
253             if tinfo:
254                 traces_info[guid] = testbed.traces_info()
255         return traces_info
256
257     @staticmethod
258     def _parallel(callables):
259         excs = []
260         def wrap(callable):
261             @functools.wraps(callable)
262             def wrapped(*p, **kw):
263                 try:
264                     callable(*p, **kw)
265                 except:
266                     import traceback
267                     traceback.print_exc(file=sys.stderr)
268                     excs.append(sys.exc_info())
269             return wrapped
270         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
271         for thread in threads:
272             thread.start()
273         for thread in threads:
274             thread.join()
275         for exc in excs:
276             eTyp, eVal, eLoc = exc
277             raise eTyp, eVal, eLoc
278
279     def start(self):
280         parser = XmlExperimentParser()
281         data = parser.from_xml_to_data(self._experiment_design_xml)
282
283         # instantiate testbed controllers
284         self._init_testbed_controllers(data)
285         
286         # persist testbed connection data, for potential recovery
287         self._persist_testbed_proxies()
288         
289         def steps_to_configure(self, allowed_guids):
290             # perform setup in parallel for all test beds,
291             # wait for all threads to finish
292             self._parallel([testbed.do_setup 
293                             for guid,testbed in self._testbeds.iteritems()
294                             if guid in allowed_guids])
295        
296             # perform create-connect in parallel, wait
297             # (internal connections only)
298             self._parallel([testbed.do_create
299                             for guid,testbed in self._testbeds.iteritems()
300                             if guid in allowed_guids])
301
302             self._parallel([testbed.do_connect_init
303                             for guid,testbed in self._testbeds.iteritems()
304                             if guid in allowed_guids])
305
306             self._parallel([testbed.do_connect_compl
307                             for guid,testbed in self._testbeds.iteritems()
308                             if guid in allowed_guids])
309
310             self._parallel([testbed.do_preconfigure
311                             for guid,testbed in self._testbeds.iteritems()
312                             if guid in allowed_guids])
313             self._clear_caches()
314
315         steps_to_configure(self, self._testbeds)
316
317         if self._netreffed_testbeds:
318             # initally resolve netrefs
319             self.do_netrefs(data, fail_if_undefined=False)
320             
321             # rinse and repeat, for netreffed testbeds
322             netreffed_testbeds = set(self._netreffed_testbeds)
323
324             self._init_testbed_controllers(data)
325             
326             # persist testbed connection data, for potential recovery
327             self._persist_testbed_proxies()
328
329             # configure dependant testbeds
330             steps_to_configure(self, netreffed_testbeds)
331             
332         # final netref step, fail if anything's left unresolved
333         self.do_netrefs(data, fail_if_undefined=True)
334         
335         # perform do_configure in parallel for al testbeds
336         # (it's internal configuration for each)
337         self._parallel([testbed.do_configure
338                         for testbed in self._testbeds.itervalues()])
339
340         self._clear_caches()
341
342         #print >>sys.stderr, "DO IT"
343         #import time
344         #time.sleep(60)
345         
346         # cross-connect (cannot be done in parallel)
347         for guid, testbed in self._testbeds.iteritems():
348             cross_data = self._get_cross_data(guid)
349             testbed.do_cross_connect_init(cross_data)
350         for guid, testbed in self._testbeds.iteritems():
351             cross_data = self._get_cross_data(guid)
352             testbed.do_cross_connect_compl(cross_data)
353        
354         self._clear_caches()
355
356         # Last chance to configure (parallel on all testbeds)
357         self._parallel([testbed.do_prestart
358                         for testbed in self._testbeds.itervalues()])
359
360         self._clear_caches()
361         
362         # update execution xml with execution-specific values
363         self._update_execute_xml()
364         self.persist_execute_xml()
365
366         # start experiment (parallel start on all testbeds)
367         self._parallel([testbed.start
368                         for testbed in self._testbeds.itervalues()])
369
370         self._clear_caches()
371
372     def _clear_caches(self):
373         # Cleaning cache for safety.
374         self._guids_in_testbed_cache = dict()
375
376     def _persist_testbed_proxies(self):
377         TRANSIENT = ('Recover',)
378         
379         # persist access configuration for all testbeds, so that
380         # recovery mode can reconnect to them if it becomes necessary
381         conf = ConfigParser.RawConfigParser()
382         for testbed_guid, testbed_config in self._deployment_config.iteritems():
383             testbed_guid = str(testbed_guid)
384             conf.add_section(testbed_guid)
385             for attr in testbed_config.get_attribute_list():
386                 if attr not in TRANSIENT:
387                     conf.set(testbed_guid, attr, 
388                         testbed_config.get_attribute_value(attr))
389         
390         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
391         conf.write(f)
392         f.close()
393     
394     def _load_testbed_proxies(self):
395         TYPEMAP = {
396             STRING : 'get',
397             INTEGER : 'getint',
398             FLOAT : 'getfloat',
399             BOOLEAN : 'getboolean',
400         }
401         
402         # deferred import because proxy needs
403         # our class definitions to define proxies
404         import nepi.util.proxy as proxy
405         
406         conf = ConfigParser.RawConfigParser()
407         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
408         for testbed_guid in conf.sections():
409             testbed_config = proxy.AccessConfiguration()
410             for attr in conf.options(testbed_guid):
411                 testbed_config.set_attribute_value(attr, 
412                     conf.get(testbed_guid, attr) )
413                 
414             testbed_guid = str(testbed_guid)
415             conf.add_section(testbed_guid)
416             for attr in testbed_config.get_attribute_list():
417                 if attr not in TRANSIENT:
418                     getter = getattr(conf, TYPEMAP.get(
419                         testbed_config.get_attribute_type(attr),
420                         'get') )
421                     testbed_config.set_attribute_value(
422                         testbed_guid, attr, getter(attr))
423     
424     def _unpersist_testbed_proxies(self):
425         try:
426             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
427         except:
428             # Just print exceptions, this is just cleanup
429             import traceback
430             ######## BUG ##########
431             #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
432             #traceback.print_exc(file=sys.stderr)
433
434     def _update_execute_xml(self):
435         # For all testbeds,
436         #   For all elements in testbed,
437         #       - gather immutable execute-readable attribuets lists
438         #         asynchronously
439         # Generate new design description from design xml
440         # (Wait for attributes lists - implicit syncpoint)
441         # For all testbeds,
442         #   For all elements in testbed,
443         #       - gather all immutable execute-readable attribute
444         #         values, asynchronously
445         # (Wait for attribute values - implicit syncpoint)
446         # For all testbeds,
447         #   For all elements in testbed,
448         #       - inject non-None values into new design
449         # Generate execute xml from new design
450
451         def undefer(deferred):
452             if hasattr(deferred, '_get'):
453                 return deferred._get()
454             else:
455                 return deferred
456         
457         attribute_lists = dict(
458             (testbed_guid, collections.defaultdict(dict))
459             for testbed_guid in self._testbeds
460         )
461         
462         for testbed_guid, testbed in self._testbeds.iteritems():
463             guids = self._guids_in_testbed(testbed_guid)
464             for guid in guids:
465                 attribute_lists[testbed_guid][guid] = \
466                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
467         
468         parser = XmlExperimentParser()
469         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
470
471         attribute_values = dict(
472             (testbed_guid, collections.defaultdict(dict))
473             for testbed_guid in self._testbeds
474         )
475         
476         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
477             testbed = self._testbeds[testbed_guid]
478             for guid, attribute_list in testbed_attribute_lists.iteritems():
479                 attribute_list = undefer(attribute_list)
480                 attribute_values[testbed_guid][guid] = dict(
481                     (attribute, testbed.get_deferred(guid, attribute))
482                     for attribute in attribute_list
483                 )
484         
485         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
486             for guid, attribute_values in testbed_attribute_values.iteritems():
487                 for attribute, value in attribute_values.iteritems():
488                     value = undefer(value)
489                     if value is not None:
490                         execute_data.add_attribute_data(guid, attribute, value)
491         
492         self._experiment_execute_xml = parser.to_xml(data=execute_data)
493
494     def stop(self):
495        for testbed in self._testbeds.values():
496            testbed.stop()
497        self._unpersist_testbed_proxies()
498    
499     def recover(self):
500         # reload perviously persisted testbed access configurations
501         self._load_testbed_proxies()
502         
503         # recreate testbed proxies by reconnecting only
504         self._init_testbed_controllers(recover = True)
505         
506         # another time, for netrefs
507         self._init_testbed_controllers(recover = True)
508
509     def is_finished(self, guid):
510         testbed = self._testbed_for_guid(guid)
511         if testbed != None:
512             return testbed.status(guid) == AS.STATUS_FINISHED
513         raise RuntimeError("No element exists with guid %d" % guid)    
514
515     def set(self, guid, name, value, time = TIME_NOW):
516         testbed = self._testbed_for_guid(guid)
517         if testbed != None:
518             testbed.set(guid, name, value, time)
519         else:
520             raise RuntimeError("No element exists with guid %d" % guid)    
521
522     def get(self, guid, name, time = TIME_NOW):
523         testbed = self._testbed_for_guid(guid)
524         if testbed != None:
525             return testbed.get(guid, name, time)
526         raise RuntimeError("No element exists with guid %d" % guid)    
527
528     def get_deferred(self, guid, name, time = TIME_NOW):
529         testbed = self._testbed_for_guid(guid)
530         if testbed != None:
531             return testbed.get_deferred(guid, name, time)
532         raise RuntimeError("No element exists with guid %d" % guid)    
533
534     def get_factory_id(self, guid):
535         testbed = self._testbed_for_guid(guid)
536         if testbed != None:
537             return testbed.get_factory_id(guid)
538         raise RuntimeError("No element exists with guid %d" % guid)    
539
540     def get_testbed_id(self, guid):
541         testbed = self._testbed_for_guid(guid)
542         if testbed != None:
543             return testbed.testbed_id
544         raise RuntimeError("No element exists with guid %d" % guid)    
545
546     def get_testbed_version(self, guid):
547         testbed = self._testbed_for_guid(guid)
548         if testbed != None:
549             return testbed.testbed_version
550         raise RuntimeError("No element exists with guid %d" % guid)    
551
552     def shutdown(self):
553         exceptions = list()
554         for testbed in self._testbeds.values():
555             try:
556                 testbed.shutdown()
557             except:
558                 exceptions.append(sys.exc_info())
559         for exc_info in exceptions:
560             raise exc_info[0], exc_info[1], exc_info[2]
561
562     def _testbed_for_guid(self, guid):
563         for testbed_guid in self._testbeds.keys():
564             if guid in self._guids_in_testbed(testbed_guid):
565                 return self._testbeds[testbed_guid]
566         return None
567
568     def _guids_in_testbed(self, testbed_guid):
569         if testbed_guid not in self._testbeds:
570             return set()
571         if testbed_guid not in self._guids_in_testbed_cache:
572             self._guids_in_testbed_cache[testbed_guid] = \
573                 set(self._testbeds[testbed_guid].guids)
574         return self._guids_in_testbed_cache[testbed_guid]
575
576     @staticmethod
577     def _netref_component_split(component):
578         match = COMPONENT_PATTERN.match(component)
579         if match:
580             return match.group("kind"), match.group("index")
581         else:
582             return component, None
583
584     _NETREF_COMPONENT_GETTERS = {
585         'addr':
586             lambda testbed, guid, index, name: 
587                 testbed.get_address(guid, int(index), name),
588         'route' :
589             lambda testbed, guid, index, name: 
590                 testbed.get_route(guid, int(index), name),
591         'trace' :
592             lambda testbed, guid, index, name: 
593                 testbed.trace(guid, index, name),
594         '' : 
595             lambda testbed, guid, index, name: 
596                 testbed.get(guid, name),
597     }
598     
599     def resolve_netref_value(self, value, failval = None):
600         match = ATTRIBUTE_PATTERN_BASE.search(value)
601         if match:
602             label = match.group("label")
603             if label.startswith('GUID-'):
604                 ref_guid = int(label[5:])
605                 if ref_guid:
606                     expr = match.group("expr")
607                     component = (match.group("component") or "")[1:] # skip the dot
608                     attribute = match.group("attribute")
609                     
610                     # split compound components into component kind and index
611                     # eg: 'addr[0]' -> ('addr', '0')
612                     component, component_index = self._netref_component_split(component)
613
614                     # find object and resolve expression
615                     for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
616                         if component not in self._NETREF_COMPONENT_GETTERS:
617                             raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
618                         elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
619                             pass
620                         else:
621                             ref_value = self._NETREF_COMPONENT_GETTERS[component](
622                                 ref_testbed, ref_guid, component_index, attribute)
623                             if ref_value:
624                                 return value.replace(match.group(), ref_value)
625         # couldn't find value
626         return failval
627     
628     def do_netrefs(self, data, fail_if_undefined = False):
629         # element netrefs
630         for (testbed_guid, guid), attrs in self._netrefs.items():
631             testbed = self._testbeds.get(testbed_guid)
632             if testbed is not None:
633                 for name in set(attrs):
634                     value = testbed.get(guid, name)
635                     if isinstance(value, basestring):
636                         ref_value = self.resolve_netref_value(value)
637                         if ref_value is not None:
638                             testbed.set(guid, name, ref_value)
639                             attrs.remove(name)
640                         elif fail_if_undefined:
641                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
642                 if not attrs:
643                     del self._netrefs[(testbed_guid, guid)]
644         
645         # testbed netrefs
646         for testbed_guid, attrs in self._testbed_netrefs.items():
647             tb_data = dict(data.get_attribute_data(testbed_guid))
648             if data:
649                 for name in set(attrs):
650                     value = tb_data.get(name)
651                     if isinstance(value, basestring):
652                         ref_value = self.resolve_netref_value(value)
653                         if ref_value is not None:
654                             data.set_attribute_data(testbed_guid, name, ref_value)
655                             attrs.remove(name)
656                         elif fail_if_undefined:
657                             raise ValueError, "Unresolvable netref in: %r" % (value,)
658                 if not attrs:
659                     del self._testbed_netrefs[testbed_guid]
660         
661
662     def _init_testbed_controllers(self, data, recover = False):
663         blacklist_testbeds = set(self._testbeds)
664         element_guids = list()
665         label_guids = dict()
666         data_guids = data.guids
667
668         # create testbed controllers
669         for guid in data_guids:
670             if data.is_testbed_data(guid):
671                 if guid not in self._testbeds:
672                     self._create_testbed_controller(guid, data, element_guids,
673                             recover)
674             else:
675                 (testbed_guid, factory_id) = data.get_box_data(guid)
676                 if testbed_guid not in blacklist_testbeds:
677                     element_guids.append(guid)
678                     label = data.get_attribute_data(guid, "label")
679                     if label is not None:
680                         if label in label_guids:
681                             raise RuntimeError, "Label %r is not unique" % (label,)
682                         label_guids[label] = guid
683
684         # replace references to elements labels for its guid
685         self._resolve_labels(data, data_guids, label_guids)
686     
687         # program testbed controllers
688         if not recover:
689             self._program_testbed_controllers(element_guids, data)
690
691     def _resolve_labels(self, data, data_guids, label_guids):
692         netrefs = self._netrefs
693         testbed_netrefs = self._testbed_netrefs
694         for guid in data_guids:
695             for name, value in data.get_attribute_data(guid):
696                 if isinstance(value, basestring):
697                     match = ATTRIBUTE_PATTERN_BASE.search(value)
698                     if match:
699                         label = match.group("label")
700                         if not label.startswith('GUID-'):
701                             ref_guid = label_guids.get(label)
702                             if ref_guid is not None:
703                                 value = ATTRIBUTE_PATTERN_BASE.sub(
704                                     ATTRIBUTE_PATTERN_GUID_SUB % dict(
705                                         guid = 'GUID-%d' % (ref_guid,),
706                                         expr = match.group("expr"),
707                                         label = label), 
708                                     value)
709                                 data.set_attribute_data(guid, name, value)
710                                 
711                                 # memorize which guid-attribute pairs require
712                                 # postprocessing, to avoid excessive controller-testbed
713                                 # communication at configuration time
714                                 # (which could require high-latency network I/O)
715                                 if not data.is_testbed_data(guid):
716                                     (testbed_guid, factory_id) = data.get_box_data(guid)
717                                     netrefs[(testbed_guid, guid)].add(name)
718                                 else:
719                                     testbed_netrefs[guid].add(name)
720
721     def _create_testbed_controller(self, guid, data, element_guids, recover):
722         (testbed_id, testbed_version) = data.get_testbed_data(guid)
723         deployment_config = self._deployment_config.get(guid)
724         
725         # deferred import because proxy needs
726         # our class definitions to define proxies
727         import nepi.util.proxy as proxy
728         
729         if deployment_config is None:
730             # need to create one
731             deployment_config = proxy.AccessConfiguration()
732             
733             for (name, value) in data.get_attribute_data(guid):
734                 if value is not None and deployment_config.has_attribute(name):
735                     # if any deployment config attribute has a netref, we can't
736                     # create this controller yet
737                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
738                         # remember to re-issue this one
739                         self._netreffed_testbeds.add(guid)
740                         return
741                     
742                     # copy deployment config attribute
743                     deployment_config.set_attribute_value(name, value)
744             
745             # commit config
746             self._deployment_config[guid] = deployment_config
747         
748         if deployment_config is not None:
749             # force recovery mode 
750             deployment_config.set_attribute_value("recover",recover)
751         
752         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
753                 deployment_config)
754         for (name, value) in data.get_attribute_data(guid):
755             testbed.defer_configure(name, value)
756         self._testbeds[guid] = testbed
757         if guid in self._netreffed_testbeds:
758             self._netreffed_testbeds.remove(guid)
759
760     def _program_testbed_controllers(self, element_guids, data):
761         deferred = dict()
762
763         def add_deferred_testbed(deferred, testbed_guid):
764             if not testbed_guid in deferred:
765                 deferred[testbed_guid] = dict()
766                 deferred[testbed_guid]["connections"] = set()
767                 deferred[testbed_guid]["cross_connections"] = set()
768
769         def add_deferred_connection(deferred, data, guid, connector_type_name,
770                 other_guid, other_connector_type_name):
771
772             (testbed_guid, factory_id) = data.get_box_data(guid)
773             (other_testbed_guid, other_factory_id) = data.get_box_data(
774                     other_guid)
775             testbed = self._testbeds[testbed_guid]
776             testbed_id = testbed.testbed_id
777             add_deferred_testbed(deferred, testbed_guid)
778
779             if testbed_guid == other_testbed_guid:
780                 # each testbed should take care of enforcing internal
781                 # connection simmetry, so each connection is only
782                 # added in one direction
783                 c = (guid, connector_type_name, other_guid,
784                         other_connector_type_name)
785                 deferred[testbed_guid]["connections"].add(c)
786             else:
787                 # the controller takes care of cross_connection simmetry
788                 # so cross_connections are added in both directions
789                 other_testbed = self._testbeds[other_testbed_guid]
790                 other_testbed_id = other_testbed.testbed_id
791                 add_deferred_testbed(deferred, other_testbed_guid)
792                 c1 = (testbed_guid, testbed_id, guid, factory_id,
793                         connector_type_name, other_testbed_guid,
794                         other_testbed_id, other_guid, other_factory_id,
795                         other_connector_type_name)
796                 c2 = (other_testbed_guid, other_testbed_id, other_guid, 
797                         other_factory_id, other_connector_type_name,
798                         testbed_guid, testbed_id, guid, factory_id,
799                         connector_type_name)
800                 deferred[testbed_guid]["cross_connections"].add(c1)
801                 deferred[other_testbed_guid]["cross_connections"].add(c2)
802
803         def resolve_create_netref(data, guid, name, value): 
804             # Try to resolve create-time netrefs, if possible
805             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
806                 try:
807                     nuvalue = self.resolve_netref_value(value)
808                 except:
809                     # Any trouble means we're not in shape to resolve the netref yet
810                     nuvalue = None
811                 if nuvalue is not None:
812                     # Only if we succeed we remove the netref deferral entry
813                     value = nuvalue
814                     data.set_attribute_data(guid, name, value)
815                     if (testbed_guid, guid) in self._netrefs:
816                         self._netrefs[(testbed_guid, guid)].discard(name)
817             return value
818
819         for guid in element_guids:
820             (testbed_guid, factory_id) = data.get_box_data(guid)
821             testbed = self._testbeds.get(testbed_guid)
822             # create
823             testbed.defer_create(guid, factory_id)
824             # set attributes
825             for (name, value) in data.get_attribute_data(guid):
826                 value = resolve_create_netref(data, guid, name, value)
827                 testbed.defer_create_set(guid, name, value)
828             # traces
829             for trace_id in data.get_trace_data(guid):
830                 testbed.defer_add_trace(guid, trace_id)
831             # addresses
832             for (address, netprefix, broadcast) in data.get_address_data(guid):
833                 if address != None:
834                     testbed.defer_add_address(guid, address, netprefix, 
835                             broadcast)
836             # routes
837             for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
838                 testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
839             # store connections data
840             for (connector_type_name, other_guid, other_connector_type_name) \
841                     in data.get_connection_data(guid):
842                 add_deferred_connection(deferred, data, guid,
843                         connector_type_name, other_guid,
844                         other_connector_type_name)
845
846         # connections        
847         for testbed_guid, data in deferred.iteritems():
848             testbed = self._testbeds.get(testbed_guid)
849             for (guid, connector_type_name, other_guid,
850                     other_connector_type_name) in data["connections"]:
851                 testbed.defer_connect(guid, connector_type_name, 
852                         other_guid, other_connector_type_name)
853             for (testbed_guid, testbed_id, guid, factory_id,
854                         connector_type_name, other_testbed_guid,
855                         other_testbed_id, other_guid, other_factory_id,
856                         other_connector_type_name) in data["cross_connections"]:
857                 testbed.defer_cross_connect(guid, connector_type_name, other_guid, 
858                         other_testbed_guid, other_testbed_id, other_factory_id, 
859                         other_connector_type_name)
860                 # save cross data for later
861                 self._add_crossdata(testbed_guid, guid, other_testbed_guid,
862                         other_guid)
863
864     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
865         if testbed_guid not in self._cross_data:
866             self._cross_data[testbed_guid] = dict()
867         if cross_testbed_guid not in self._cross_data[testbed_guid]:
868             self._cross_data[testbed_guid][cross_testbed_guid] = set()
869         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
870
871     def _get_cross_data(self, testbed_guid):
872         cross_data = dict()
873         if not testbed_guid in self._cross_data:
874             return cross_data
875         for cross_testbed_guid, guid_list in \
876                 self._cross_data[testbed_guid].iteritems():
877             cross_data[cross_testbed_guid] = dict()
878             cross_testbed = self._testbeds[cross_testbed_guid]
879             for cross_guid in guid_list:
880                 elem_cross_data = dict(
881                     _guid = cross_guid,
882                     _testbed_guid = cross_testbed_guid,
883                     _testbed_id = cross_testbed.testbed_id,
884                     _testbed_version = cross_testbed.testbed_version)
885                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
886                 attribute_list = cross_testbed.get_attribute_list(cross_guid)
887                 for attr_name in attribute_list:
888                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
889                     elem_cross_data[attr_name] = attr_value
890         return cross_data
891