Merge with head
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
19
20 def _undefer(deferred):
21     if hasattr(deferred, '_get'):
22         return deferred._get()
23     else:
24         return deferred
25
26
27 class TestbedController(object):
28     def __init__(self, testbed_id, testbed_version):
29         self._testbed_id = testbed_id
30         self._testbed_version = testbed_version
31
32     @property
33     def testbed_id(self):
34         return self._testbed_id
35
36     @property
37     def testbed_version(self):
38         return self._testbed_version
39
40     @property
41     def guids(self):
42         raise NotImplementedError
43
44     def defer_configure(self, name, value):
45         """Instructs setting a configuartion attribute for the testbed instance"""
46         raise NotImplementedError
47
48     def defer_create(self, guid, factory_id):
49         """Instructs creation of element """
50         raise NotImplementedError
51
52     def defer_create_set(self, guid, name, value):
53         """Instructs setting an initial attribute on an element"""
54         raise NotImplementedError
55
56     def defer_factory_set(self, guid, name, value):
57         """Instructs setting an attribute on a factory"""
58         raise NotImplementedError
59
60     def defer_connect(self, guid1, connector_type_name1, guid2, 
61             connector_type_name2): 
62         """Instructs creation of a connection between the given connectors"""
63         raise NotImplementedError
64
65     def defer_cross_connect(self, 
66             guid, connector_type_name,
67             cross_guid, cross_testbed_guid,
68             cross_testbed_id, cross_factory_id,
69             cross_connector_type_name):
70         """
71         Instructs creation of a connection between the given connectors 
72         of different testbed instances
73         """
74         raise NotImplementedError
75
76     def defer_add_trace(self, guid, trace_id):
77         """Instructs the addition of a trace"""
78         raise NotImplementedError
79
80     def defer_add_address(self, guid, address, netprefix, broadcast): 
81         """Instructs the addition of an address"""
82         raise NotImplementedError
83
84     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
85         """Instructs the addition of a route"""
86         raise NotImplementedError
87
88     def do_setup(self):
89         """After do_setup the testbed initial configuration is done"""
90         raise NotImplementedError
91
92     def do_create(self):
93         """
94         After do_create all instructed elements are created and 
95         attributes setted
96         """
97         raise NotImplementedError
98
99     def do_connect_init(self):
100         """
101         After do_connect_init all internal connections between testbed elements
102         are initiated
103         """
104         raise NotImplementedError
105
106     def do_connect_compl(self):
107         """
108         After do_connect all internal connections between testbed elements
109         are completed
110         """
111         raise NotImplementedError
112
113     def do_preconfigure(self):
114         """
115         Done just before resolving netrefs, after connection, before cross connections,
116         useful for early stages of configuration, for setting up stuff that might be
117         required for netref resolution.
118         """
119         raise NotImplementedError
120
121     def do_configure(self):
122         """After do_configure elements are configured"""
123         raise NotImplementedError
124
125     def do_prestart(self):
126         """Before do_start elements are prestart-configured"""
127         raise NotImplementedError
128
129     def do_cross_connect_init(self, cross_data):
130         """
131         After do_cross_connect_init initiation of all external connections 
132         between different testbed elements is performed
133         """
134         raise NotImplementedError
135
136     def do_cross_connect_compl(self, cross_data):
137         """
138         After do_cross_connect_compl completion of all external connections 
139         between different testbed elements is performed
140         """
141         raise NotImplementedError
142
143     def start(self):
144         raise NotImplementedError
145
146     def stop(self):
147         raise NotImplementedError
148
149     def set(self, guid, name, value, time = TIME_NOW):
150         raise NotImplementedError
151
152     def get(self, guid, name, time = TIME_NOW):
153         raise NotImplementedError
154     
155     def get_route(self, guid, index, attribute):
156         """
157         Params:
158             
159             guid: guid of box to query
160             index: number of routing entry to fetch
161             attribute: one of Destination, NextHop, NetPrefix
162         """
163         raise NotImplementedError
164
165     def get_address(self, guid, index, attribute='Address'):
166         """
167         Params:
168             
169             guid: guid of box to query
170             index: number of inteface to select
171             attribute: one of Address, NetPrefix, Broadcast
172         """
173         raise NotImplementedError
174
175     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
176         raise NotImplementedError
177
178     def get_factory_id(self, guid):
179         raise NotImplementedError
180
181     def action(self, time, guid, action):
182         raise NotImplementedError
183
184     def status(self, guid):
185         raise NotImplementedError
186
187     def trace(self, guid, trace_id, attribute='value'):
188         raise NotImplementedError
189
190     def traces_info(self):
191         """ dictionary of dictionaries:
192             traces_info = dict({
193                 guid = dict({
194                     trace_id = dict({
195                             host = host,
196                             filepath = filepath,
197                             filesize = size in bytes,
198                         })
199                 })
200             })"""
201         raise NotImplementedError
202
203     def shutdown(self):
204         raise NotImplementedError
205
206 class ExperimentController(object):
207     def __init__(self, experiment_xml, root_dir):
208         self._experiment_design_xml = experiment_xml
209         self._experiment_execute_xml = None
210         self._testbeds = dict()
211         self._deployment_config = dict()
212         self._netrefs = collections.defaultdict(set)
213         self._testbed_netrefs = collections.defaultdict(set)
214         self._cross_data = dict()
215         self._root_dir = root_dir
216         self._netreffed_testbeds = set()
217         self._guids_in_testbed_cache = dict()
218
219         self.persist_experiment_xml()
220
221     @property
222     def experiment_design_xml(self):
223         return self._experiment_design_xml
224
225     @property
226     def experiment_execute_xml(self):
227         return self._experiment_execute_xml
228
229     @property
230     def guids(self):
231         guids = list()
232         for testbed_guid in self._testbeds.keys():
233             _guids = self._guids_in_testbed(testbed_guid)
234             if _guids:
235                 guids.extend(_guids)
236         return guids
237
238     def persist_experiment_xml(self):
239         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
240         f = open(xml_path, "w")
241         f.write(self._experiment_design_xml)
242         f.close()
243
244     def persist_execute_xml(self):
245         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
246         f = open(xml_path, "w")
247         f.write(self._experiment_execute_xml)
248         f.close()
249
250     def trace(self, guid, trace_id, attribute='value'):
251         testbed = self._testbed_for_guid(guid)
252         if testbed != None:
253             return testbed.trace(guid, trace_id, attribute)
254         raise RuntimeError("No element exists with guid %d" % guid)    
255
256     def traces_info(self):
257         traces_info = dict()
258         for guid, testbed in self._testbeds.iteritems():
259             tinfo = testbed.traces_info()
260             if tinfo:
261                 traces_info[guid] = testbed.traces_info()
262         return traces_info
263
264     @staticmethod
265     def _parallel(callables):
266         excs = []
267         def wrap(callable):
268             @functools.wraps(callable)
269             def wrapped(*p, **kw):
270                 try:
271                     callable(*p, **kw)
272                 except:
273                     import traceback
274                     traceback.print_exc(file=sys.stderr)
275                     excs.append(sys.exc_info())
276             return wrapped
277         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
278         for thread in threads:
279             thread.start()
280         for thread in threads:
281             thread.join()
282         for exc in excs:
283             eTyp, eVal, eLoc = exc
284             raise eTyp, eVal, eLoc
285
286     def start(self):
287         parser = XmlExperimentParser()
288         data = parser.from_xml_to_data(self._experiment_design_xml)
289
290         # instantiate testbed controllers
291         self._init_testbed_controllers(data)
292         
293         # persist testbed connection data, for potential recovery
294         self._persist_testbed_proxies()
295         
296         def steps_to_configure(self, allowed_guids):
297             # perform setup in parallel for all test beds,
298             # wait for all threads to finish
299             self._parallel([testbed.do_setup 
300                             for guid,testbed in self._testbeds.iteritems()
301                             if guid in allowed_guids])
302        
303             # perform create-connect in parallel, wait
304             # (internal connections only)
305             self._parallel([testbed.do_create
306                             for guid,testbed in self._testbeds.iteritems()
307                             if guid in allowed_guids])
308
309             self._parallel([testbed.do_connect_init
310                             for guid,testbed in self._testbeds.iteritems()
311                             if guid in allowed_guids])
312
313             self._parallel([testbed.do_connect_compl
314                             for guid,testbed in self._testbeds.iteritems()
315                             if guid in allowed_guids])
316
317             self._parallel([testbed.do_preconfigure
318                             for guid,testbed in self._testbeds.iteritems()
319                             if guid in allowed_guids])
320             self._clear_caches()
321
322         steps_to_configure(self, self._testbeds)
323
324         if self._netreffed_testbeds:
325             # initally resolve netrefs
326             self.do_netrefs(data, fail_if_undefined=False)
327             
328             # rinse and repeat, for netreffed testbeds
329             netreffed_testbeds = set(self._netreffed_testbeds)
330
331             self._init_testbed_controllers(data)
332             
333             # persist testbed connection data, for potential recovery
334             self._persist_testbed_proxies()
335
336             # configure dependant testbeds
337             steps_to_configure(self, netreffed_testbeds)
338             
339         # final netref step, fail if anything's left unresolved
340         self.do_netrefs(data, fail_if_undefined=True)
341        
342         # Only now, that netref dependencies have been solve, it is safe to
343         # program cross_connections
344         self._program_testbed_cross_connections(data)
345  
346         # perform do_configure in parallel for al testbeds
347         # (it's internal configuration for each)
348         self._parallel([testbed.do_configure
349                         for testbed in self._testbeds.itervalues()])
350
351         self._clear_caches()
352
353         #print >>sys.stderr, "DO IT"
354         #import time
355         #time.sleep(60)
356         
357         # cross-connect (cannot be done in parallel)
358         for guid, testbed in self._testbeds.iteritems():
359             cross_data = self._get_cross_data(guid)
360             testbed.do_cross_connect_init(cross_data)
361         for guid, testbed in self._testbeds.iteritems():
362             cross_data = self._get_cross_data(guid)
363             testbed.do_cross_connect_compl(cross_data)
364        
365         self._clear_caches()
366
367         # Last chance to configure (parallel on all testbeds)
368         self._parallel([testbed.do_prestart
369                         for testbed in self._testbeds.itervalues()])
370
371         self._clear_caches()
372         
373         # update execution xml with execution-specific values
374         # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
375         self._update_execute_xml()
376         self.persist_execute_xml()
377
378         # start experiment (parallel start on all testbeds)
379         self._parallel([testbed.start
380                         for testbed in self._testbeds.itervalues()])
381
382         self._clear_caches()
383
384     def _clear_caches(self):
385         # Cleaning cache for safety.
386         self._guids_in_testbed_cache = dict()
387
388     def _persist_testbed_proxies(self):
389         TRANSIENT = ('Recover',)
390         
391         # persist access configuration for all testbeds, so that
392         # recovery mode can reconnect to them if it becomes necessary
393         conf = ConfigParser.RawConfigParser()
394         for testbed_guid, testbed_config in self._deployment_config.iteritems():
395             testbed_guid = str(testbed_guid)
396             conf.add_section(testbed_guid)
397             for attr in testbed_config.get_attribute_list():
398                 if attr not in TRANSIENT:
399                     conf.set(testbed_guid, attr, 
400                         testbed_config.get_attribute_value(attr))
401         
402         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
403         conf.write(f)
404         f.close()
405     
406     def _load_testbed_proxies(self):
407         TYPEMAP = {
408             STRING : 'get',
409             INTEGER : 'getint',
410             FLOAT : 'getfloat',
411             BOOLEAN : 'getboolean',
412         }
413         
414         # deferred import because proxy needs
415         # our class definitions to define proxies
416         import nepi.util.proxy as proxy
417         
418         conf = ConfigParser.RawConfigParser()
419         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
420         for testbed_guid in conf.sections():
421             testbed_config = proxy.AccessConfiguration()
422             for attr in conf.options(testbed_guid):
423                 testbed_config.set_attribute_value(attr, 
424                     conf.get(testbed_guid, attr) )
425                 
426             testbed_guid = str(testbed_guid)
427             conf.add_section(testbed_guid)
428             for attr in testbed_config.get_attribute_list():
429                 if attr not in TRANSIENT:
430                     getter = getattr(conf, TYPEMAP.get(
431                         testbed_config.get_attribute_type(attr),
432                         'get') )
433                     testbed_config.set_attribute_value(
434                         testbed_guid, attr, getter(attr))
435     
436     def _unpersist_testbed_proxies(self):
437         try:
438             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
439         except:
440             # Just print exceptions, this is just cleanup
441             import traceback
442             ######## BUG ##########
443             #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
444             #traceback.print_exc(file=sys.stderr)
445
446     def _update_execute_xml(self):
447         # For all testbeds,
448         #   For all elements in testbed,
449         #       - gather immutable execute-readable attribuets lists
450         #         asynchronously
451         # Generate new design description from design xml
452         # (Wait for attributes lists - implicit syncpoint)
453         # For all testbeds,
454         #   For all elements in testbed,
455         #       - gather all immutable execute-readable attribute
456         #         values, asynchronously
457         # (Wait for attribute values - implicit syncpoint)
458         # For all testbeds,
459         #   For all elements in testbed,
460         #       - inject non-None values into new design
461         # Generate execute xml from new design
462
463         attribute_lists = dict(
464             (testbed_guid, collections.defaultdict(dict))
465             for testbed_guid in self._testbeds
466         )
467         
468         for testbed_guid, testbed in self._testbeds.iteritems():
469             guids = self._guids_in_testbed(testbed_guid)
470             for guid in guids:
471                 attribute_lists[testbed_guid][guid] = \
472                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
473         
474         parser = XmlExperimentParser()
475         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
476
477         attribute_values = dict(
478             (testbed_guid, collections.defaultdict(dict))
479             for testbed_guid in self._testbeds
480         )
481         
482         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
483             testbed = self._testbeds[testbed_guid]
484             for guid, attribute_list in testbed_attribute_lists.iteritems():
485                 attribute_list = _undefer(attribute_list)
486                 attribute_values[testbed_guid][guid] = dict(
487                     (attribute, testbed.get_deferred(guid, attribute))
488                     for attribute in attribute_list
489                 )
490         
491         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
492             for guid, attribute_values in testbed_attribute_values.iteritems():
493                 for attribute, value in attribute_values.iteritems():
494                     value = _undefer(value)
495                     if value is not None:
496                         execute_data.add_attribute_data(guid, attribute, value)
497         
498         self._experiment_execute_xml = parser.to_xml(data=execute_data)
499
500     def stop(self):
501        for testbed in self._testbeds.values():
502            testbed.stop()
503        self._unpersist_testbed_proxies()
504    
505     def recover(self):
506         # reload perviously persisted testbed access configurations
507         self._load_testbed_proxies()
508         
509         # recreate testbed proxies by reconnecting only
510         self._init_testbed_controllers(recover = True)
511         
512         # another time, for netrefs
513         self._init_testbed_controllers(recover = True)
514
515     def is_finished(self, guid):
516         testbed = self._testbed_for_guid(guid)
517         if testbed != None:
518             return testbed.status(guid) == AS.STATUS_FINISHED
519         raise RuntimeError("No element exists with guid %d" % guid)    
520
521     def set(self, guid, name, value, time = TIME_NOW):
522         testbed = self._testbed_for_guid(guid)
523         if testbed != None:
524             testbed.set(guid, name, value, time)
525         else:
526             raise RuntimeError("No element exists with guid %d" % guid)    
527
528     def get(self, guid, name, time = TIME_NOW):
529         testbed = self._testbed_for_guid(guid)
530         if testbed != None:
531             return testbed.get(guid, name, time)
532         raise RuntimeError("No element exists with guid %d" % guid)    
533
534     def get_deferred(self, guid, name, time = TIME_NOW):
535         testbed = self._testbed_for_guid(guid)
536         if testbed != None:
537             return testbed.get_deferred(guid, name, time)
538         raise RuntimeError("No element exists with guid %d" % guid)    
539
540     def get_factory_id(self, guid):
541         testbed = self._testbed_for_guid(guid)
542         if testbed != None:
543             return testbed.get_factory_id(guid)
544         raise RuntimeError("No element exists with guid %d" % guid)    
545
546     def get_testbed_id(self, guid):
547         testbed = self._testbed_for_guid(guid)
548         if testbed != None:
549             return testbed.testbed_id
550         raise RuntimeError("No element exists with guid %d" % guid)    
551
552     def get_testbed_version(self, guid):
553         testbed = self._testbed_for_guid(guid)
554         if testbed != None:
555             return testbed.testbed_version
556         raise RuntimeError("No element exists with guid %d" % guid)    
557
558     def shutdown(self):
559         exceptions = list()
560         for testbed in self._testbeds.values():
561             try:
562                 testbed.shutdown()
563             except:
564                 exceptions.append(sys.exc_info())
565         for exc_info in exceptions:
566             raise exc_info[0], exc_info[1], exc_info[2]
567
568     def _testbed_for_guid(self, guid):
569         for testbed_guid in self._testbeds.keys():
570             if guid in self._guids_in_testbed(testbed_guid):
571                 return self._testbeds[testbed_guid]
572         return None
573
574     def _guids_in_testbed(self, testbed_guid):
575         if testbed_guid not in self._testbeds:
576             return set()
577         if testbed_guid not in self._guids_in_testbed_cache:
578             self._guids_in_testbed_cache[testbed_guid] = \
579                 set(self._testbeds[testbed_guid].guids)
580         return self._guids_in_testbed_cache[testbed_guid]
581
582     @staticmethod
583     def _netref_component_split(component):
584         match = COMPONENT_PATTERN.match(component)
585         if match:
586             return match.group("kind"), match.group("index")
587         else:
588             return component, None
589
590     _NETREF_COMPONENT_GETTERS = {
591         'addr':
592             lambda testbed, guid, index, name: 
593                 testbed.get_address(guid, int(index), name),
594         'route' :
595             lambda testbed, guid, index, name: 
596                 testbed.get_route(guid, int(index), name),
597         'trace' :
598             lambda testbed, guid, index, name: 
599                 testbed.trace(guid, index, name),
600         '' : 
601             lambda testbed, guid, index, name: 
602                 testbed.get(guid, name),
603     }
604     
605     def resolve_netref_value(self, value, failval = None):
606         match = ATTRIBUTE_PATTERN_BASE.search(value)
607         if match:
608             label = match.group("label")
609             if label.startswith('GUID-'):
610                 ref_guid = int(label[5:])
611                 if ref_guid:
612                     expr = match.group("expr")
613                     component = (match.group("component") or "")[1:] # skip the dot
614                     attribute = match.group("attribute")
615                     
616                     # split compound components into component kind and index
617                     # eg: 'addr[0]' -> ('addr', '0')
618                     component, component_index = self._netref_component_split(component)
619
620                     # find object and resolve expression
621                     for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
622                         if component not in self._NETREF_COMPONENT_GETTERS:
623                             raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
624                         elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
625                             pass
626                         else:
627                             ref_value = self._NETREF_COMPONENT_GETTERS[component](
628                                 ref_testbed, ref_guid, component_index, attribute)
629                             if ref_value:
630                                 return value.replace(match.group(), ref_value)
631         # couldn't find value
632         return failval
633     
634     def do_netrefs(self, data, fail_if_undefined = False):
635         # element netrefs
636         for (testbed_guid, guid), attrs in self._netrefs.items():
637             testbed = self._testbeds.get(testbed_guid)
638             if testbed is not None:
639                 for name in set(attrs):
640                     value = testbed.get(guid, name)
641                     if isinstance(value, basestring):
642                         ref_value = self.resolve_netref_value(value)
643                         if ref_value is not None:
644                             testbed.set(guid, name, ref_value)
645                             attrs.remove(name)
646                         elif fail_if_undefined:
647                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
648                 if not attrs:
649                     del self._netrefs[(testbed_guid, guid)]
650         
651         # testbed netrefs
652         for testbed_guid, attrs in self._testbed_netrefs.items():
653             tb_data = dict(data.get_attribute_data(testbed_guid))
654             if data:
655                 for name in set(attrs):
656                     value = tb_data.get(name)
657                     if isinstance(value, basestring):
658                         ref_value = self.resolve_netref_value(value)
659                         if ref_value is not None:
660                             data.set_attribute_data(testbed_guid, name, ref_value)
661                             attrs.remove(name)
662                         elif fail_if_undefined:
663                             raise ValueError, "Unresolvable netref in: %r" % (value,)
664                 if not attrs:
665                     del self._testbed_netrefs[testbed_guid]
666         
667
668     def _init_testbed_controllers(self, data, recover = False):
669         blacklist_testbeds = set(self._testbeds)
670         element_guids = list()
671         label_guids = dict()
672         data_guids = data.guids
673
674         # create testbed controllers
675         for guid in data_guids:
676             if data.is_testbed_data(guid):
677                 if guid not in self._testbeds:
678                     self._create_testbed_controller(guid, data, element_guids,
679                             recover)
680             else:
681                 (testbed_guid, factory_id) = data.get_box_data(guid)
682                 if testbed_guid not in blacklist_testbeds:
683                     element_guids.append(guid)
684                     label = data.get_attribute_data(guid, "label")
685                     if label is not None:
686                         if label in label_guids:
687                             raise RuntimeError, "Label %r is not unique" % (label,)
688                         label_guids[label] = guid
689
690         # replace references to elements labels for its guid
691         self._resolve_labels(data, data_guids, label_guids)
692     
693         # program testbed controllers
694         if not recover:
695             self._program_testbed_controllers(element_guids, data)
696
697     def _resolve_labels(self, data, data_guids, label_guids):
698         netrefs = self._netrefs
699         testbed_netrefs = self._testbed_netrefs
700         for guid in data_guids:
701             for name, value in data.get_attribute_data(guid):
702                 if isinstance(value, basestring):
703                     match = ATTRIBUTE_PATTERN_BASE.search(value)
704                     if match:
705                         label = match.group("label")
706                         if not label.startswith('GUID-'):
707                             ref_guid = label_guids.get(label)
708                             if ref_guid is not None:
709                                 value = ATTRIBUTE_PATTERN_BASE.sub(
710                                     ATTRIBUTE_PATTERN_GUID_SUB % dict(
711                                         guid = 'GUID-%d' % (ref_guid,),
712                                         expr = match.group("expr"),
713                                         label = label), 
714                                     value)
715                                 data.set_attribute_data(guid, name, value)
716                                 
717                                 # memorize which guid-attribute pairs require
718                                 # postprocessing, to avoid excessive controller-testbed
719                                 # communication at configuration time
720                                 # (which could require high-latency network I/O)
721                                 if not data.is_testbed_data(guid):
722                                     (testbed_guid, factory_id) = data.get_box_data(guid)
723                                     netrefs[(testbed_guid, guid)].add(name)
724                                 else:
725                                     testbed_netrefs[guid].add(name)
726
727     def _create_testbed_controller(self, guid, data, element_guids, recover):
728         (testbed_id, testbed_version) = data.get_testbed_data(guid)
729         deployment_config = self._deployment_config.get(guid)
730         
731         # deferred import because proxy needs
732         # our class definitions to define proxies
733         import nepi.util.proxy as proxy
734         
735         if deployment_config is None:
736             # need to create one
737             deployment_config = proxy.AccessConfiguration()
738             
739             for (name, value) in data.get_attribute_data(guid):
740                 if value is not None and deployment_config.has_attribute(name):
741                     # if any deployment config attribute has a netref, we can't
742                     # create this controller yet
743                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
744                         # remember to re-issue this one
745                         self._netreffed_testbeds.add(guid)
746                         return
747                     
748                     # copy deployment config attribute
749                     deployment_config.set_attribute_value(name, value)
750             
751             # commit config
752             self._deployment_config[guid] = deployment_config
753         
754         if deployment_config is not None:
755             # force recovery mode 
756             deployment_config.set_attribute_value("recover",recover)
757         
758         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
759                 deployment_config)
760         for (name, value) in data.get_attribute_data(guid):
761             testbed.defer_configure(name, value)
762         self._testbeds[guid] = testbed
763         if guid in self._netreffed_testbeds:
764             self._netreffed_testbeds.remove(guid)
765
766     def _program_testbed_controllers(self, element_guids, data):
767         def resolve_create_netref(data, guid, name, value): 
768             # Try to resolve create-time netrefs, if possible
769             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
770                 try:
771                     nuvalue = self.resolve_netref_value(value)
772                 except:
773                     # Any trouble means we're not in shape to resolve the netref yet
774                     nuvalue = None
775                 if nuvalue is not None:
776                     # Only if we succeed we remove the netref deferral entry
777                     value = nuvalue
778                     data.set_attribute_data(guid, name, value)
779                     if (testbed_guid, guid) in self._netrefs:
780                         self._netrefs[(testbed_guid, guid)].discard(name)
781             return value
782
783         for guid in element_guids:
784             (testbed_guid, factory_id) = data.get_box_data(guid)
785             testbed = self._testbeds.get(testbed_guid)
786             if testbed is not None:
787                 # create
788                 testbed.defer_create(guid, factory_id)
789                 # set attributes
790                 for (name, value) in data.get_attribute_data(guid):
791                     value = resolve_create_netref(data, guid, name, value)
792                     testbed.defer_create_set(guid, name, value)
793
794         for guid in element_guids:
795             (testbed_guid, factory_id) = data.get_box_data(guid)
796             testbed = self._testbeds.get(testbed_guid)
797             if testbed is not None:
798                 # traces
799                 for trace_id in data.get_trace_data(guid):
800                     testbed.defer_add_trace(guid, trace_id)
801                 # addresses
802                 for (address, netprefix, broadcast) in data.get_address_data(guid):
803                     if address != None:
804                         testbed.defer_add_address(guid, address, netprefix, 
805                                 broadcast)
806                 # routes
807                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
808                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
809                 # store connections data
810                 for (connector_type_name, other_guid, other_connector_type_name) \
811                         in data.get_connection_data(guid):
812                     (other_testbed_guid, other_factory_id) = data.get_box_data(
813                             other_guid)
814                     if testbed_guid == other_testbed_guid:
815                         # each testbed should take care of enforcing internal
816                         # connection simmetry, so each connection is only
817                         # added in one direction
818                         testbed.defer_connect(guid, connector_type_name, 
819                                 other_guid, other_connector_type_name)
820
821     def _program_testbed_cross_connections(self, data):
822         data_guids = data.guids
823         for guid in data_guids: 
824             if not data.is_testbed_data(guid):
825                 (testbed_guid, factory_id) = data.get_box_data(guid)
826                 testbed = self._testbeds.get(testbed_guid)
827                 if testbed is not None:
828                     for (connector_type_name, cross_guid, cross_connector_type_name) \
829                             in data.get_connection_data(guid):
830                         (testbed_guid, factory_id) = data.get_box_data(guid)
831                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
832                                 cross_guid)
833                         if testbed_guid != cross_testbed_guid:
834                             cross_testbed = self._testbeds[cross_testbed_guid]
835                             cross_testbed_id = cross_testbed.testbed_id
836                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
837                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
838                                     cross_connector_type_name)
839                             # save cross data for later
840                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
841                                     cross_guid)
842
843     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
844         if testbed_guid not in self._cross_data:
845             self._cross_data[testbed_guid] = dict()
846         if cross_testbed_guid not in self._cross_data[testbed_guid]:
847             self._cross_data[testbed_guid][cross_testbed_guid] = set()
848         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
849
850     def _get_cross_data(self, testbed_guid):
851         cross_data = dict()
852         if not testbed_guid in self._cross_data:
853             return cross_data
854
855         # fetch attribute lists in one batch
856         attribute_lists = dict()
857         for cross_testbed_guid, guid_list in \
858                 self._cross_data[testbed_guid].iteritems():
859             cross_testbed = self._testbeds[cross_testbed_guid]
860             for cross_guid in guid_list:
861                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
862                     cross_testbed.get_attribute_list_deferred(cross_guid)
863
864         # fetch attribute values in another batch
865         for cross_testbed_guid, guid_list in \
866                 self._cross_data[testbed_guid].iteritems():
867             cross_data[cross_testbed_guid] = dict()
868             cross_testbed = self._testbeds[cross_testbed_guid]
869             for cross_guid in guid_list:
870                 elem_cross_data = dict(
871                     _guid = cross_guid,
872                     _testbed_guid = cross_testbed_guid,
873                     _testbed_id = cross_testbed.testbed_id,
874                     _testbed_version = cross_testbed.testbed_version)
875                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
876                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
877                 for attr_name in attribute_list:
878                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
879                     elem_cross_data[attr_name] = attr_value
880         
881         # undefer all values - we'll have to serialize them probably later
882         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
883             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
884                 for attr_name, attr_value in elem_cross_data.iteritems():
885                     elem_cross_data[attr_name] = _undefer(attr_value)
886         
887         return cross_data
888