Reconnection fix: testbed deserialization was completely borked
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
19
20 def _undefer(deferred):
21     if hasattr(deferred, '_get'):
22         return deferred._get()
23     else:
24         return deferred
25
26
27 class TestbedController(object):
28     def __init__(self, testbed_id, testbed_version):
29         self._testbed_id = testbed_id
30         self._testbed_version = testbed_version
31
32     @property
33     def testbed_id(self):
34         return self._testbed_id
35
36     @property
37     def testbed_version(self):
38         return self._testbed_version
39
40     @property
41     def guids(self):
42         raise NotImplementedError
43
44     def defer_configure(self, name, value):
45         """Instructs setting a configuartion attribute for the testbed instance"""
46         raise NotImplementedError
47
48     def defer_create(self, guid, factory_id):
49         """Instructs creation of element """
50         raise NotImplementedError
51
52     def defer_create_set(self, guid, name, value):
53         """Instructs setting an initial attribute on an element"""
54         raise NotImplementedError
55
56     def defer_factory_set(self, guid, name, value):
57         """Instructs setting an attribute on a factory"""
58         raise NotImplementedError
59
60     def defer_connect(self, guid1, connector_type_name1, guid2, 
61             connector_type_name2): 
62         """Instructs creation of a connection between the given connectors"""
63         raise NotImplementedError
64
65     def defer_cross_connect(self, 
66             guid, connector_type_name,
67             cross_guid, cross_testbed_guid,
68             cross_testbed_id, cross_factory_id,
69             cross_connector_type_name):
70         """
71         Instructs creation of a connection between the given connectors 
72         of different testbed instances
73         """
74         raise NotImplementedError
75
76     def defer_add_trace(self, guid, trace_id):
77         """Instructs the addition of a trace"""
78         raise NotImplementedError
79
80     def defer_add_address(self, guid, address, netprefix, broadcast): 
81         """Instructs the addition of an address"""
82         raise NotImplementedError
83
84     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
85         """Instructs the addition of a route"""
86         raise NotImplementedError
87
88     def do_setup(self):
89         """After do_setup the testbed initial configuration is done"""
90         raise NotImplementedError
91
92     def do_create(self):
93         """
94         After do_create all instructed elements are created and 
95         attributes setted
96         """
97         raise NotImplementedError
98
99     def do_connect_init(self):
100         """
101         After do_connect_init all internal connections between testbed elements
102         are initiated
103         """
104         raise NotImplementedError
105
106     def do_connect_compl(self):
107         """
108         After do_connect all internal connections between testbed elements
109         are completed
110         """
111         raise NotImplementedError
112
113     def do_preconfigure(self):
114         """
115         Done just before resolving netrefs, after connection, before cross connections,
116         useful for early stages of configuration, for setting up stuff that might be
117         required for netref resolution.
118         """
119         raise NotImplementedError
120
121     def do_configure(self):
122         """After do_configure elements are configured"""
123         raise NotImplementedError
124
125     def do_prestart(self):
126         """Before do_start elements are prestart-configured"""
127         raise NotImplementedError
128
129     def do_cross_connect_init(self, cross_data):
130         """
131         After do_cross_connect_init initiation of all external connections 
132         between different testbed elements is performed
133         """
134         raise NotImplementedError
135
136     def do_cross_connect_compl(self, cross_data):
137         """
138         After do_cross_connect_compl completion of all external connections 
139         between different testbed elements is performed
140         """
141         raise NotImplementedError
142
143     def start(self):
144         raise NotImplementedError
145
146     def stop(self):
147         raise NotImplementedError
148
149     def set(self, guid, name, value, time = TIME_NOW):
150         raise NotImplementedError
151
152     def get(self, guid, name, time = TIME_NOW):
153         raise NotImplementedError
154     
155     def get_route(self, guid, index, attribute):
156         """
157         Params:
158             
159             guid: guid of box to query
160             index: number of routing entry to fetch
161             attribute: one of Destination, NextHop, NetPrefix
162         """
163         raise NotImplementedError
164
165     def get_address(self, guid, index, attribute='Address'):
166         """
167         Params:
168             
169             guid: guid of box to query
170             index: number of inteface to select
171             attribute: one of Address, NetPrefix, Broadcast
172         """
173         raise NotImplementedError
174
175     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
176         raise NotImplementedError
177
178     def get_factory_id(self, guid):
179         raise NotImplementedError
180
181     def action(self, time, guid, action):
182         raise NotImplementedError
183
184     def status(self, guid):
185         raise NotImplementedError
186
187     def trace(self, guid, trace_id, attribute='value'):
188         raise NotImplementedError
189
190     def traces_info(self):
191         """ dictionary of dictionaries:
192             traces_info = dict({
193                 guid = dict({
194                     trace_id = dict({
195                             host = host,
196                             filepath = filepath,
197                             filesize = size in bytes,
198                         })
199                 })
200             })"""
201         raise NotImplementedError
202
203     def shutdown(self):
204         raise NotImplementedError
205
206 class ExperimentController(object):
207     def __init__(self, experiment_xml, root_dir):
208         self._experiment_design_xml = experiment_xml
209         self._experiment_execute_xml = None
210         self._testbeds = dict()
211         self._deployment_config = dict()
212         self._netrefs = collections.defaultdict(set)
213         self._testbed_netrefs = collections.defaultdict(set)
214         self._cross_data = dict()
215         self._root_dir = root_dir
216         self._netreffed_testbeds = set()
217         self._guids_in_testbed_cache = dict()
218
219         self.persist_experiment_xml()
220
221     @property
222     def experiment_design_xml(self):
223         return self._experiment_design_xml
224
225     @property
226     def experiment_execute_xml(self):
227         return self._experiment_execute_xml
228
229     @property
230     def guids(self):
231         guids = list()
232         for testbed_guid in self._testbeds.keys():
233             _guids = self._guids_in_testbed(testbed_guid)
234             if _guids:
235                 guids.extend(_guids)
236         return guids
237
238     def persist_experiment_xml(self):
239         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
240         f = open(xml_path, "w")
241         f.write(self._experiment_design_xml)
242         f.close()
243
244     def persist_execute_xml(self):
245         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
246         f = open(xml_path, "w")
247         f.write(self._experiment_execute_xml)
248         f.close()
249
250     def trace(self, guid, trace_id, attribute='value'):
251         testbed = self._testbed_for_guid(guid)
252         if testbed != None:
253             return testbed.trace(guid, trace_id, attribute)
254         raise RuntimeError("No element exists with guid %d" % guid)    
255
256     def traces_info(self):
257         traces_info = dict()
258         for guid, testbed in self._testbeds.iteritems():
259             tinfo = testbed.traces_info()
260             if tinfo:
261                 traces_info[guid] = testbed.traces_info()
262         return traces_info
263
264     @staticmethod
265     def _parallel(callables):
266         excs = []
267         def wrap(callable):
268             @functools.wraps(callable)
269             def wrapped(*p, **kw):
270                 try:
271                     callable(*p, **kw)
272                 except:
273                     import traceback
274                     traceback.print_exc(file=sys.stderr)
275                     excs.append(sys.exc_info())
276             return wrapped
277         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
278         for thread in threads:
279             thread.start()
280         for thread in threads:
281             thread.join()
282         for exc in excs:
283             eTyp, eVal, eLoc = exc
284             raise eTyp, eVal, eLoc
285
286     def start(self):
287         parser = XmlExperimentParser()
288         data = parser.from_xml_to_data(self._experiment_design_xml)
289
290         # instantiate testbed controllers
291         self._init_testbed_controllers(data)
292         
293         # persist testbed connection data, for potential recovery
294         self._persist_testbed_proxies()
295         
296         def steps_to_configure(self, allowed_guids):
297             # perform setup in parallel for all test beds,
298             # wait for all threads to finish
299             self._parallel([testbed.do_setup 
300                             for guid,testbed in self._testbeds.iteritems()
301                             if guid in allowed_guids])
302        
303             # perform create-connect in parallel, wait
304             # (internal connections only)
305             self._parallel([testbed.do_create
306                             for guid,testbed in self._testbeds.iteritems()
307                             if guid in allowed_guids])
308
309             self._parallel([testbed.do_connect_init
310                             for guid,testbed in self._testbeds.iteritems()
311                             if guid in allowed_guids])
312
313             self._parallel([testbed.do_connect_compl
314                             for guid,testbed in self._testbeds.iteritems()
315                             if guid in allowed_guids])
316
317             self._parallel([testbed.do_preconfigure
318                             for guid,testbed in self._testbeds.iteritems()
319                             if guid in allowed_guids])
320             self._clear_caches()
321
322         steps_to_configure(self, self._testbeds)
323
324         if self._netreffed_testbeds:
325             # initally resolve netrefs
326             self.do_netrefs(data, fail_if_undefined=False)
327             
328             # rinse and repeat, for netreffed testbeds
329             netreffed_testbeds = set(self._netreffed_testbeds)
330
331             self._init_testbed_controllers(data)
332             
333             # persist testbed connection data, for potential recovery
334             self._persist_testbed_proxies()
335
336             # configure dependant testbeds
337             steps_to_configure(self, netreffed_testbeds)
338             
339         # final netref step, fail if anything's left unresolved
340         self.do_netrefs(data, fail_if_undefined=True)
341        
342         # Only now, that netref dependencies have been solve, it is safe to
343         # program cross_connections
344         self._program_testbed_cross_connections(data)
345  
346         # perform do_configure in parallel for al testbeds
347         # (it's internal configuration for each)
348         self._parallel([testbed.do_configure
349                         for testbed in self._testbeds.itervalues()])
350
351         self._clear_caches()
352
353         #print >>sys.stderr, "DO IT"
354         #import time
355         #time.sleep(60)
356         
357         # cross-connect (cannot be done in parallel)
358         for guid, testbed in self._testbeds.iteritems():
359             cross_data = self._get_cross_data(guid)
360             testbed.do_cross_connect_init(cross_data)
361         for guid, testbed in self._testbeds.iteritems():
362             cross_data = self._get_cross_data(guid)
363             testbed.do_cross_connect_compl(cross_data)
364        
365         self._clear_caches()
366
367         # Last chance to configure (parallel on all testbeds)
368         self._parallel([testbed.do_prestart
369                         for testbed in self._testbeds.itervalues()])
370
371         self._clear_caches()
372         
373         # update execution xml with execution-specific values
374         # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
375         self._update_execute_xml()
376         self.persist_execute_xml()
377
378         # start experiment (parallel start on all testbeds)
379         self._parallel([testbed.start
380                         for testbed in self._testbeds.itervalues()])
381
382         self._clear_caches()
383
384     def _clear_caches(self):
385         # Cleaning cache for safety.
386         self._guids_in_testbed_cache = dict()
387
388     def _persist_testbed_proxies(self):
389         TRANSIENT = ('Recover',)
390         
391         # persist access configuration for all testbeds, so that
392         # recovery mode can reconnect to them if it becomes necessary
393         conf = ConfigParser.RawConfigParser()
394         for testbed_guid, testbed_config in self._deployment_config.iteritems():
395             testbed_guid = str(testbed_guid)
396             conf.add_section(testbed_guid)
397             for attr in testbed_config.get_attribute_list():
398                 if attr not in TRANSIENT:
399                     conf.set(testbed_guid, attr, 
400                         testbed_config.get_attribute_value(attr))
401         
402         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
403         conf.write(f)
404         f.close()
405     
406     def _load_testbed_proxies(self):
407         TYPEMAP = {
408             Attribute.STRING : 'get',
409             Attribute.BOOL : 'getboolean',
410             Attribute.ENUM : 'get',
411             Attribute.DOUBLE : 'getfloat',
412             Attribute.INTEGER : 'getint',
413         }
414         
415         # deferred import because proxy needs
416         # our class definitions to define proxies
417         import nepi.util.proxy as proxy
418         
419         conf = ConfigParser.RawConfigParser()
420         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
421         for testbed_guid in conf.sections():
422             testbed_config = proxy.AccessConfiguration()
423             testbed_guid = str(testbed_guid)
424             for attr in testbed_config.get_attribute_list():
425                 if attr not in TRANSIENT:
426                     getter = getattr(conf, TYPEMAP.get(
427                         testbed_config.get_attribute_type(attr),
428                         'get') )
429                     testbed_config.set_attribute_value(
430                         attr, getter(testbed_guid, attr))
431     
432     def _unpersist_testbed_proxies(self):
433         try:
434             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
435         except:
436             # Just print exceptions, this is just cleanup
437             import traceback
438             ######## BUG ##########
439             #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
440             #traceback.print_exc(file=sys.stderr)
441
442     def _update_execute_xml(self):
443         # For all testbeds,
444         #   For all elements in testbed,
445         #       - gather immutable execute-readable attribuets lists
446         #         asynchronously
447         # Generate new design description from design xml
448         # (Wait for attributes lists - implicit syncpoint)
449         # For all testbeds,
450         #   For all elements in testbed,
451         #       - gather all immutable execute-readable attribute
452         #         values, asynchronously
453         # (Wait for attribute values - implicit syncpoint)
454         # For all testbeds,
455         #   For all elements in testbed,
456         #       - inject non-None values into new design
457         # Generate execute xml from new design
458
459         attribute_lists = dict(
460             (testbed_guid, collections.defaultdict(dict))
461             for testbed_guid in self._testbeds
462         )
463         
464         for testbed_guid, testbed in self._testbeds.iteritems():
465             guids = self._guids_in_testbed(testbed_guid)
466             for guid in guids:
467                 attribute_lists[testbed_guid][guid] = \
468                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
469         
470         parser = XmlExperimentParser()
471         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
472
473         attribute_values = dict(
474             (testbed_guid, collections.defaultdict(dict))
475             for testbed_guid in self._testbeds
476         )
477         
478         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
479             testbed = self._testbeds[testbed_guid]
480             for guid, attribute_list in testbed_attribute_lists.iteritems():
481                 attribute_list = _undefer(attribute_list)
482                 attribute_values[testbed_guid][guid] = dict(
483                     (attribute, testbed.get_deferred(guid, attribute))
484                     for attribute in attribute_list
485                 )
486         
487         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
488             for guid, attribute_values in testbed_attribute_values.iteritems():
489                 for attribute, value in attribute_values.iteritems():
490                     value = _undefer(value)
491                     if value is not None:
492                         execute_data.add_attribute_data(guid, attribute, value)
493         
494         self._experiment_execute_xml = parser.to_xml(data=execute_data)
495
496     def stop(self):
497        for testbed in self._testbeds.values():
498            testbed.stop()
499        self._unpersist_testbed_proxies()
500    
501     def recover(self):
502         # reload perviously persisted testbed access configurations
503         self._load_testbed_proxies()
504         
505         # recreate testbed proxies by reconnecting only
506         self._init_testbed_controllers(recover = True)
507         
508         # another time, for netrefs
509         self._init_testbed_controllers(recover = True)
510
511     def is_finished(self, guid):
512         testbed = self._testbed_for_guid(guid)
513         if testbed != None:
514             return testbed.status(guid) == AS.STATUS_FINISHED
515         raise RuntimeError("No element exists with guid %d" % guid)    
516
517     def set(self, guid, name, value, time = TIME_NOW):
518         testbed = self._testbed_for_guid(guid)
519         if testbed != None:
520             testbed.set(guid, name, value, time)
521         else:
522             raise RuntimeError("No element exists with guid %d" % guid)    
523
524     def get(self, guid, name, time = TIME_NOW):
525         testbed = self._testbed_for_guid(guid)
526         if testbed != None:
527             return testbed.get(guid, name, time)
528         raise RuntimeError("No element exists with guid %d" % guid)    
529
530     def get_deferred(self, guid, name, time = TIME_NOW):
531         testbed = self._testbed_for_guid(guid)
532         if testbed != None:
533             return testbed.get_deferred(guid, name, time)
534         raise RuntimeError("No element exists with guid %d" % guid)    
535
536     def get_factory_id(self, guid):
537         testbed = self._testbed_for_guid(guid)
538         if testbed != None:
539             return testbed.get_factory_id(guid)
540         raise RuntimeError("No element exists with guid %d" % guid)    
541
542     def get_testbed_id(self, guid):
543         testbed = self._testbed_for_guid(guid)
544         if testbed != None:
545             return testbed.testbed_id
546         raise RuntimeError("No element exists with guid %d" % guid)    
547
548     def get_testbed_version(self, guid):
549         testbed = self._testbed_for_guid(guid)
550         if testbed != None:
551             return testbed.testbed_version
552         raise RuntimeError("No element exists with guid %d" % guid)    
553
554     def shutdown(self):
555         exceptions = list()
556         for testbed in self._testbeds.values():
557             try:
558                 testbed.shutdown()
559             except:
560                 exceptions.append(sys.exc_info())
561         for exc_info in exceptions:
562             raise exc_info[0], exc_info[1], exc_info[2]
563
564     def _testbed_for_guid(self, guid):
565         for testbed_guid in self._testbeds.keys():
566             if guid in self._guids_in_testbed(testbed_guid):
567                 return self._testbeds[testbed_guid]
568         return None
569
570     def _guids_in_testbed(self, testbed_guid):
571         if testbed_guid not in self._testbeds:
572             return set()
573         if testbed_guid not in self._guids_in_testbed_cache:
574             self._guids_in_testbed_cache[testbed_guid] = \
575                 set(self._testbeds[testbed_guid].guids)
576         return self._guids_in_testbed_cache[testbed_guid]
577
578     @staticmethod
579     def _netref_component_split(component):
580         match = COMPONENT_PATTERN.match(component)
581         if match:
582             return match.group("kind"), match.group("index")
583         else:
584             return component, None
585
586     _NETREF_COMPONENT_GETTERS = {
587         'addr':
588             lambda testbed, guid, index, name: 
589                 testbed.get_address(guid, int(index), name),
590         'route' :
591             lambda testbed, guid, index, name: 
592                 testbed.get_route(guid, int(index), name),
593         'trace' :
594             lambda testbed, guid, index, name: 
595                 testbed.trace(guid, index, name),
596         '' : 
597             lambda testbed, guid, index, name: 
598                 testbed.get(guid, name),
599     }
600     
601     def resolve_netref_value(self, value, failval = None):
602         match = ATTRIBUTE_PATTERN_BASE.search(value)
603         if match:
604             label = match.group("label")
605             if label.startswith('GUID-'):
606                 ref_guid = int(label[5:])
607                 if ref_guid:
608                     expr = match.group("expr")
609                     component = (match.group("component") or "")[1:] # skip the dot
610                     attribute = match.group("attribute")
611                     
612                     # split compound components into component kind and index
613                     # eg: 'addr[0]' -> ('addr', '0')
614                     component, component_index = self._netref_component_split(component)
615
616                     # find object and resolve expression
617                     for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
618                         if component not in self._NETREF_COMPONENT_GETTERS:
619                             raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
620                         elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
621                             pass
622                         else:
623                             ref_value = self._NETREF_COMPONENT_GETTERS[component](
624                                 ref_testbed, ref_guid, component_index, attribute)
625                             if ref_value:
626                                 return value.replace(match.group(), ref_value)
627         # couldn't find value
628         return failval
629     
630     def do_netrefs(self, data, fail_if_undefined = False):
631         # element netrefs
632         for (testbed_guid, guid), attrs in self._netrefs.items():
633             testbed = self._testbeds.get(testbed_guid)
634             if testbed is not None:
635                 for name in set(attrs):
636                     value = testbed.get(guid, name)
637                     if isinstance(value, basestring):
638                         ref_value = self.resolve_netref_value(value)
639                         if ref_value is not None:
640                             testbed.set(guid, name, ref_value)
641                             attrs.remove(name)
642                         elif fail_if_undefined:
643                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
644                 if not attrs:
645                     del self._netrefs[(testbed_guid, guid)]
646         
647         # testbed netrefs
648         for testbed_guid, attrs in self._testbed_netrefs.items():
649             tb_data = dict(data.get_attribute_data(testbed_guid))
650             if data:
651                 for name in set(attrs):
652                     value = tb_data.get(name)
653                     if isinstance(value, basestring):
654                         ref_value = self.resolve_netref_value(value)
655                         if ref_value is not None:
656                             data.set_attribute_data(testbed_guid, name, ref_value)
657                             attrs.remove(name)
658                         elif fail_if_undefined:
659                             raise ValueError, "Unresolvable netref in: %r" % (value,)
660                 if not attrs:
661                     del self._testbed_netrefs[testbed_guid]
662         
663
664     def _init_testbed_controllers(self, data, recover = False):
665         blacklist_testbeds = set(self._testbeds)
666         element_guids = list()
667         label_guids = dict()
668         data_guids = data.guids
669
670         # create testbed controllers
671         for guid in data_guids:
672             if data.is_testbed_data(guid):
673                 if guid not in self._testbeds:
674                     self._create_testbed_controller(guid, data, element_guids,
675                             recover)
676             else:
677                 (testbed_guid, factory_id) = data.get_box_data(guid)
678                 if testbed_guid not in blacklist_testbeds:
679                     element_guids.append(guid)
680                     label = data.get_attribute_data(guid, "label")
681                     if label is not None:
682                         if label in label_guids:
683                             raise RuntimeError, "Label %r is not unique" % (label,)
684                         label_guids[label] = guid
685
686         # replace references to elements labels for its guid
687         self._resolve_labels(data, data_guids, label_guids)
688     
689         # program testbed controllers
690         if not recover:
691             self._program_testbed_controllers(element_guids, data)
692
693     def _resolve_labels(self, data, data_guids, label_guids):
694         netrefs = self._netrefs
695         testbed_netrefs = self._testbed_netrefs
696         for guid in data_guids:
697             for name, value in data.get_attribute_data(guid):
698                 if isinstance(value, basestring):
699                     match = ATTRIBUTE_PATTERN_BASE.search(value)
700                     if match:
701                         label = match.group("label")
702                         if not label.startswith('GUID-'):
703                             ref_guid = label_guids.get(label)
704                             if ref_guid is not None:
705                                 value = ATTRIBUTE_PATTERN_BASE.sub(
706                                     ATTRIBUTE_PATTERN_GUID_SUB % dict(
707                                         guid = 'GUID-%d' % (ref_guid,),
708                                         expr = match.group("expr"),
709                                         label = label), 
710                                     value)
711                                 data.set_attribute_data(guid, name, value)
712                                 
713                                 # memorize which guid-attribute pairs require
714                                 # postprocessing, to avoid excessive controller-testbed
715                                 # communication at configuration time
716                                 # (which could require high-latency network I/O)
717                                 if not data.is_testbed_data(guid):
718                                     (testbed_guid, factory_id) = data.get_box_data(guid)
719                                     netrefs[(testbed_guid, guid)].add(name)
720                                 else:
721                                     testbed_netrefs[guid].add(name)
722
723     def _create_testbed_controller(self, guid, data, element_guids, recover):
724         (testbed_id, testbed_version) = data.get_testbed_data(guid)
725         deployment_config = self._deployment_config.get(guid)
726         
727         # deferred import because proxy needs
728         # our class definitions to define proxies
729         import nepi.util.proxy as proxy
730         
731         if deployment_config is None:
732             # need to create one
733             deployment_config = proxy.AccessConfiguration()
734             
735             for (name, value) in data.get_attribute_data(guid):
736                 if value is not None and deployment_config.has_attribute(name):
737                     # if any deployment config attribute has a netref, we can't
738                     # create this controller yet
739                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
740                         # remember to re-issue this one
741                         self._netreffed_testbeds.add(guid)
742                         return
743                     
744                     # copy deployment config attribute
745                     deployment_config.set_attribute_value(name, value)
746             
747             # commit config
748             self._deployment_config[guid] = deployment_config
749         
750         if deployment_config is not None:
751             # force recovery mode 
752             deployment_config.set_attribute_value("recover",recover)
753         
754         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
755                 deployment_config)
756         for (name, value) in data.get_attribute_data(guid):
757             testbed.defer_configure(name, value)
758         self._testbeds[guid] = testbed
759         if guid in self._netreffed_testbeds:
760             self._netreffed_testbeds.remove(guid)
761
762     def _program_testbed_controllers(self, element_guids, data):
763         def resolve_create_netref(data, guid, name, value): 
764             # Try to resolve create-time netrefs, if possible
765             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
766                 try:
767                     nuvalue = self.resolve_netref_value(value)
768                 except:
769                     # Any trouble means we're not in shape to resolve the netref yet
770                     nuvalue = None
771                 if nuvalue is not None:
772                     # Only if we succeed we remove the netref deferral entry
773                     value = nuvalue
774                     data.set_attribute_data(guid, name, value)
775                     if (testbed_guid, guid) in self._netrefs:
776                         self._netrefs[(testbed_guid, guid)].discard(name)
777             return value
778
779         for guid in element_guids:
780             (testbed_guid, factory_id) = data.get_box_data(guid)
781             testbed = self._testbeds.get(testbed_guid)
782             if testbed is not None:
783                 # create
784                 testbed.defer_create(guid, factory_id)
785                 # set attributes
786                 for (name, value) in data.get_attribute_data(guid):
787                     value = resolve_create_netref(data, guid, name, value)
788                     testbed.defer_create_set(guid, name, value)
789
790         for guid in element_guids:
791             (testbed_guid, factory_id) = data.get_box_data(guid)
792             testbed = self._testbeds.get(testbed_guid)
793             if testbed is not None:
794                 # traces
795                 for trace_id in data.get_trace_data(guid):
796                     testbed.defer_add_trace(guid, trace_id)
797                 # addresses
798                 for (address, netprefix, broadcast) in data.get_address_data(guid):
799                     if address != None:
800                         testbed.defer_add_address(guid, address, netprefix, 
801                                 broadcast)
802                 # routes
803                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
804                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
805                 # store connections data
806                 for (connector_type_name, other_guid, other_connector_type_name) \
807                         in data.get_connection_data(guid):
808                     (other_testbed_guid, other_factory_id) = data.get_box_data(
809                             other_guid)
810                     if testbed_guid == other_testbed_guid:
811                         # each testbed should take care of enforcing internal
812                         # connection simmetry, so each connection is only
813                         # added in one direction
814                         testbed.defer_connect(guid, connector_type_name, 
815                                 other_guid, other_connector_type_name)
816
817     def _program_testbed_cross_connections(self, data):
818         data_guids = data.guids
819         for guid in data_guids: 
820             if not data.is_testbed_data(guid):
821                 (testbed_guid, factory_id) = data.get_box_data(guid)
822                 testbed = self._testbeds.get(testbed_guid)
823                 if testbed is not None:
824                     for (connector_type_name, cross_guid, cross_connector_type_name) \
825                             in data.get_connection_data(guid):
826                         (testbed_guid, factory_id) = data.get_box_data(guid)
827                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
828                                 cross_guid)
829                         if testbed_guid != cross_testbed_guid:
830                             cross_testbed = self._testbeds[cross_testbed_guid]
831                             cross_testbed_id = cross_testbed.testbed_id
832                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
833                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
834                                     cross_connector_type_name)
835                             # save cross data for later
836                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
837                                     cross_guid)
838
839     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
840         if testbed_guid not in self._cross_data:
841             self._cross_data[testbed_guid] = dict()
842         if cross_testbed_guid not in self._cross_data[testbed_guid]:
843             self._cross_data[testbed_guid][cross_testbed_guid] = set()
844         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
845
846     def _get_cross_data(self, testbed_guid):
847         cross_data = dict()
848         if not testbed_guid in self._cross_data:
849             return cross_data
850
851         # fetch attribute lists in one batch
852         attribute_lists = dict()
853         for cross_testbed_guid, guid_list in \
854                 self._cross_data[testbed_guid].iteritems():
855             cross_testbed = self._testbeds[cross_testbed_guid]
856             for cross_guid in guid_list:
857                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
858                     cross_testbed.get_attribute_list_deferred(cross_guid)
859
860         # fetch attribute values in another batch
861         for cross_testbed_guid, guid_list in \
862                 self._cross_data[testbed_guid].iteritems():
863             cross_data[cross_testbed_guid] = dict()
864             cross_testbed = self._testbeds[cross_testbed_guid]
865             for cross_guid in guid_list:
866                 elem_cross_data = dict(
867                     _guid = cross_guid,
868                     _testbed_guid = cross_testbed_guid,
869                     _testbed_id = cross_testbed.testbed_id,
870                     _testbed_version = cross_testbed.testbed_version)
871                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
872                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
873                 for attr_name in attribute_list:
874                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
875                     elem_cross_data[attr_name] = attr_value
876         
877         # undefer all values - we'll have to serialize them probably later
878         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
879             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
880                 for attr_name, attr_value in elem_cross_data.iteritems():
881                     elem_cross_data[attr_name] = _undefer(attr_value)
882         
883         return cross_data
884