Document the recovery procedure
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
19
20 def _undefer(deferred):
21     if hasattr(deferred, '_get'):
22         return deferred._get()
23     else:
24         return deferred
25
26
27 class TestbedController(object):
28     def __init__(self, testbed_id, testbed_version):
29         self._testbed_id = testbed_id
30         self._testbed_version = testbed_version
31
32     @property
33     def testbed_id(self):
34         return self._testbed_id
35
36     @property
37     def testbed_version(self):
38         return self._testbed_version
39
40     @property
41     def guids(self):
42         raise NotImplementedError
43
44     def defer_configure(self, name, value):
45         """Instructs setting a configuartion attribute for the testbed instance"""
46         raise NotImplementedError
47
48     def defer_create(self, guid, factory_id):
49         """Instructs creation of element """
50         raise NotImplementedError
51
52     def defer_create_set(self, guid, name, value):
53         """Instructs setting an initial attribute on an element"""
54         raise NotImplementedError
55
56     def defer_factory_set(self, guid, name, value):
57         """Instructs setting an attribute on a factory"""
58         raise NotImplementedError
59
60     def defer_connect(self, guid1, connector_type_name1, guid2, 
61             connector_type_name2): 
62         """Instructs creation of a connection between the given connectors"""
63         raise NotImplementedError
64
65     def defer_cross_connect(self, 
66             guid, connector_type_name,
67             cross_guid, cross_testbed_guid,
68             cross_testbed_id, cross_factory_id,
69             cross_connector_type_name):
70         """
71         Instructs creation of a connection between the given connectors 
72         of different testbed instances
73         """
74         raise NotImplementedError
75
76     def defer_add_trace(self, guid, trace_id):
77         """Instructs the addition of a trace"""
78         raise NotImplementedError
79
80     def defer_add_address(self, guid, address, netprefix, broadcast): 
81         """Instructs the addition of an address"""
82         raise NotImplementedError
83
84     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
85         """Instructs the addition of a route"""
86         raise NotImplementedError
87
88     def do_setup(self):
89         """After do_setup the testbed initial configuration is done"""
90         raise NotImplementedError
91
92     def do_create(self):
93         """
94         After do_create all instructed elements are created and 
95         attributes setted
96         """
97         raise NotImplementedError
98
99     def do_connect_init(self):
100         """
101         After do_connect_init all internal connections between testbed elements
102         are initiated
103         """
104         raise NotImplementedError
105
106     def do_connect_compl(self):
107         """
108         After do_connect all internal connections between testbed elements
109         are completed
110         """
111         raise NotImplementedError
112
113     def do_preconfigure(self):
114         """
115         Done just before resolving netrefs, after connection, before cross connections,
116         useful for early stages of configuration, for setting up stuff that might be
117         required for netref resolution.
118         """
119         raise NotImplementedError
120
121     def do_configure(self):
122         """After do_configure elements are configured"""
123         raise NotImplementedError
124
125     def do_prestart(self):
126         """Before do_start elements are prestart-configured"""
127         raise NotImplementedError
128
129     def do_cross_connect_init(self, cross_data):
130         """
131         After do_cross_connect_init initiation of all external connections 
132         between different testbed elements is performed
133         """
134         raise NotImplementedError
135
136     def do_cross_connect_compl(self, cross_data):
137         """
138         After do_cross_connect_compl completion of all external connections 
139         between different testbed elements is performed
140         """
141         raise NotImplementedError
142
143     def start(self):
144         raise NotImplementedError
145
146     def stop(self):
147         raise NotImplementedError
148
149     def recover(self):
150         """
151         On testbed recovery (if recovery is a supported policy), the controller
152         instance will be re-created and the following sequence invoked:
153         
154             do_setup
155             *recover*
156             <cross-connection methods>
157             
158         Start will not be called, and after cross connection invocations,
159         the testbed is supposed to be fully functional again.
160         """
161         raise NotImplementedError
162
163     def set(self, guid, name, value, time = TIME_NOW):
164         raise NotImplementedError
165
166     def get(self, guid, name, time = TIME_NOW):
167         raise NotImplementedError
168     
169     def get_route(self, guid, index, attribute):
170         """
171         Params:
172             
173             guid: guid of box to query
174             index: number of routing entry to fetch
175             attribute: one of Destination, NextHop, NetPrefix
176         """
177         raise NotImplementedError
178
179     def get_address(self, guid, index, attribute='Address'):
180         """
181         Params:
182             
183             guid: guid of box to query
184             index: number of inteface to select
185             attribute: one of Address, NetPrefix, Broadcast
186         """
187         raise NotImplementedError
188
189     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
190         raise NotImplementedError
191
192     def get_factory_id(self, guid):
193         raise NotImplementedError
194
195     def action(self, time, guid, action):
196         raise NotImplementedError
197
198     def status(self, guid):
199         raise NotImplementedError
200
201     def trace(self, guid, trace_id, attribute='value'):
202         raise NotImplementedError
203
204     def traces_info(self):
205         """ dictionary of dictionaries:
206             traces_info = dict({
207                 guid = dict({
208                     trace_id = dict({
209                             host = host,
210                             filepath = filepath,
211                             filesize = size in bytes,
212                         })
213                 })
214             })"""
215         raise NotImplementedError
216
217     def shutdown(self):
218         raise NotImplementedError
219
220 class ExperimentController(object):
221     def __init__(self, experiment_xml, root_dir):
222         self._experiment_design_xml = experiment_xml
223         self._experiment_execute_xml = None
224         self._testbeds = dict()
225         self._deployment_config = dict()
226         self._netrefs = collections.defaultdict(set)
227         self._testbed_netrefs = collections.defaultdict(set)
228         self._cross_data = dict()
229         self._root_dir = root_dir
230         self._netreffed_testbeds = set()
231         self._guids_in_testbed_cache = dict()
232         
233         if experiment_xml is None and root_dir is not None:
234             # Recover
235             self.load_experiment_xml()
236             self.load_execute_xml()
237         else:
238             self.persist_experiment_xml()
239
240     @property
241     def experiment_design_xml(self):
242         return self._experiment_design_xml
243
244     @property
245     def experiment_execute_xml(self):
246         return self._experiment_execute_xml
247
248     @property
249     def guids(self):
250         guids = list()
251         for testbed_guid in self._testbeds.keys():
252             _guids = self._guids_in_testbed(testbed_guid)
253             if _guids:
254                 guids.extend(_guids)
255         return guids
256
257     def persist_experiment_xml(self):
258         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
259         f = open(xml_path, "w")
260         f.write(self._experiment_design_xml)
261         f.close()
262
263     def persist_execute_xml(self):
264         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
265         f = open(xml_path, "w")
266         f.write(self._experiment_execute_xml)
267         f.close()
268
269     def load_experiment_xml(self):
270         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
271         f = open(xml_path, "r")
272         self._experiment_design_xml = f.read()
273         f.close()
274
275     def load_execute_xml(self):
276         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
277         f = open(xml_path, "r")
278         self._experiment_execute_xml = f.read()
279         f.close()
280
281     def trace(self, guid, trace_id, attribute='value'):
282         testbed = self._testbed_for_guid(guid)
283         if testbed != None:
284             return testbed.trace(guid, trace_id, attribute)
285         raise RuntimeError("No element exists with guid %d" % guid)    
286
287     def traces_info(self):
288         traces_info = dict()
289         for guid, testbed in self._testbeds.iteritems():
290             tinfo = testbed.traces_info()
291             if tinfo:
292                 traces_info[guid] = testbed.traces_info()
293         return traces_info
294
295     @staticmethod
296     def _parallel(callables):
297         excs = []
298         def wrap(callable):
299             @functools.wraps(callable)
300             def wrapped(*p, **kw):
301                 try:
302                     callable(*p, **kw)
303                 except:
304                     import traceback
305                     traceback.print_exc(file=sys.stderr)
306                     excs.append(sys.exc_info())
307             return wrapped
308         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
309         for thread in threads:
310             thread.start()
311         for thread in threads:
312             thread.join()
313         for exc in excs:
314             eTyp, eVal, eLoc = exc
315             raise eTyp, eVal, eLoc
316
317     def start(self):
318         self._start()
319
320     def _start(self, recover = False):
321         parser = XmlExperimentParser()
322         
323         if recover:
324             xml = self._experiment_execute_xml
325         else:
326             xml = self._experiment_design_xml
327         data = parser.from_xml_to_data(xml)
328
329         # instantiate testbed controllers
330         to_recover, to_restart = self._init_testbed_controllers(data, recover)
331         all_restart = set(to_restart)
332         
333         if not recover:
334             # persist testbed connection data, for potential recovery
335             self._persist_testbed_proxies()
336         else:
337             # recover recoverable controllers
338             for guid in to_recover:
339                 self._testbeds[guid].do_setup()
340                 self._testbeds[guid].recover()
341         
342         def steps_to_configure(self, allowed_guids):
343             # perform setup in parallel for all test beds,
344             # wait for all threads to finish
345             self._parallel([testbed.do_setup 
346                             for guid,testbed in self._testbeds.iteritems()
347                             if guid in allowed_guids])
348        
349             # perform create-connect in parallel, wait
350             # (internal connections only)
351             self._parallel([testbed.do_create
352                             for guid,testbed in self._testbeds.iteritems()
353                             if guid in allowed_guids])
354
355             self._parallel([testbed.do_connect_init
356                             for guid,testbed in self._testbeds.iteritems()
357                             if guid in allowed_guids])
358
359             self._parallel([testbed.do_connect_compl
360                             for guid,testbed in self._testbeds.iteritems()
361                             if guid in allowed_guids])
362
363             self._parallel([testbed.do_preconfigure
364                             for guid,testbed in self._testbeds.iteritems()
365                             if guid in allowed_guids])
366             self._clear_caches()
367
368         steps_to_configure(self, to_restart)
369
370         if self._netreffed_testbeds:
371             # initally resolve netrefs
372             self.do_netrefs(data, fail_if_undefined=False)
373             
374             # rinse and repeat, for netreffed testbeds
375             netreffed_testbeds = set(self._netreffed_testbeds)
376
377             to_recover, to_restart = self._init_testbed_controllers(data, recover)
378             all_restart.update(to_restart)
379             
380             if not recover:
381                 # persist testbed connection data, for potential recovery
382                 self._persist_testbed_proxies()
383             else:
384                 # recover recoverable controllers
385                 for guid in to_recover:
386                     self._testbeds[guid].do_setup()
387                     self._testbeds[guid].recover()
388
389             # configure dependant testbeds
390             steps_to_configure(self, to_restart)
391         
392         all_restart = [ self._testbeds[guid] for guid in all_restart ]
393             
394         # final netref step, fail if anything's left unresolved
395         self.do_netrefs(data, fail_if_undefined=True)
396        
397         # Only now, that netref dependencies have been solve, it is safe to
398         # program cross_connections
399         self._program_testbed_cross_connections(data)
400  
401         # perform do_configure in parallel for al testbeds
402         # (it's internal configuration for each)
403         self._parallel([testbed.do_configure
404                         for testbed in all_restart])
405
406         self._clear_caches()
407
408         #print >>sys.stderr, "DO IT"
409         #import time
410         #time.sleep(60)
411         
412         # cross-connect (cannot be done in parallel)
413         for guid, testbed in self._testbeds.iteritems():
414             cross_data = self._get_cross_data(guid)
415             testbed.do_cross_connect_init(cross_data)
416         for guid, testbed in self._testbeds.iteritems():
417             cross_data = self._get_cross_data(guid)
418             testbed.do_cross_connect_compl(cross_data)
419        
420         self._clear_caches()
421
422         # Last chance to configure (parallel on all testbeds)
423         self._parallel([testbed.do_prestart
424                         for testbed in all_restart])
425
426         self._clear_caches()
427         
428         if not recover:
429             # update execution xml with execution-specific values
430             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
431             self._update_execute_xml()
432             self.persist_execute_xml()
433
434         # start experiment (parallel start on all testbeds)
435         self._parallel([testbed.start
436                         for testbed in all_restart])
437
438         self._clear_caches()
439
440     def _clear_caches(self):
441         # Cleaning cache for safety.
442         self._guids_in_testbed_cache = dict()
443
444     def _persist_testbed_proxies(self):
445         TRANSIENT = (DC.RECOVER,)
446         
447         # persist access configuration for all testbeds, so that
448         # recovery mode can reconnect to them if it becomes necessary
449         conf = ConfigParser.RawConfigParser()
450         for testbed_guid, testbed_config in self._deployment_config.iteritems():
451             testbed_guid = str(testbed_guid)
452             conf.add_section(testbed_guid)
453             for attr in testbed_config.get_attribute_list():
454                 if attr not in TRANSIENT:
455                     conf.set(testbed_guid, attr, 
456                         testbed_config.get_attribute_value(attr))
457         
458         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
459         conf.write(f)
460         f.close()
461     
462     def _load_testbed_proxies(self):
463         TYPEMAP = {
464             Attribute.STRING : 'get',
465             Attribute.BOOL : 'getboolean',
466             Attribute.ENUM : 'get',
467             Attribute.DOUBLE : 'getfloat',
468             Attribute.INTEGER : 'getint',
469         }
470         
471         TRANSIENT = (DC.RECOVER,)
472         
473         # deferred import because proxy needs
474         # our class definitions to define proxies
475         import nepi.util.proxy as proxy
476         
477         conf = ConfigParser.RawConfigParser()
478         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
479         for testbed_guid in conf.sections():
480             testbed_config = proxy.AccessConfiguration()
481             testbed_guid = str(testbed_guid)
482             for attr in testbed_config.get_attribute_list():
483                 if attr not in TRANSIENT:
484                     getter = getattr(conf, TYPEMAP.get(
485                         testbed_config.get_attribute_type(attr),
486                         'get') )
487                     testbed_config.set_attribute_value(
488                         attr, getter(testbed_guid, attr))
489     
490     def _unpersist_testbed_proxies(self):
491         try:
492             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
493         except:
494             # Just print exceptions, this is just cleanup
495             import traceback
496             ######## BUG ##########
497             #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
498             #traceback.print_exc(file=sys.stderr)
499
500     def _update_execute_xml(self):
501         # For all testbeds,
502         #   For all elements in testbed,
503         #       - gather immutable execute-readable attribuets lists
504         #         asynchronously
505         # Generate new design description from design xml
506         # (Wait for attributes lists - implicit syncpoint)
507         # For all testbeds,
508         #   For all elements in testbed,
509         #       - gather all immutable execute-readable attribute
510         #         values, asynchronously
511         # (Wait for attribute values - implicit syncpoint)
512         # For all testbeds,
513         #   For all elements in testbed,
514         #       - inject non-None values into new design
515         # Generate execute xml from new design
516
517         attribute_lists = dict(
518             (testbed_guid, collections.defaultdict(dict))
519             for testbed_guid in self._testbeds
520         )
521         
522         for testbed_guid, testbed in self._testbeds.iteritems():
523             guids = self._guids_in_testbed(testbed_guid)
524             for guid in guids:
525                 attribute_lists[testbed_guid][guid] = \
526                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
527         
528         parser = XmlExperimentParser()
529         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
530
531         attribute_values = dict(
532             (testbed_guid, collections.defaultdict(dict))
533             for testbed_guid in self._testbeds
534         )
535         
536         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
537             testbed = self._testbeds[testbed_guid]
538             for guid, attribute_list in testbed_attribute_lists.iteritems():
539                 attribute_list = _undefer(attribute_list)
540                 attribute_values[testbed_guid][guid] = dict(
541                     (attribute, testbed.get_deferred(guid, attribute))
542                     for attribute in attribute_list
543                 )
544         
545         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
546             for guid, attribute_values in testbed_attribute_values.iteritems():
547                 for attribute, value in attribute_values.iteritems():
548                     value = _undefer(value)
549                     if value is not None:
550                         execute_data.add_attribute_data(guid, attribute, value)
551         
552         self._experiment_execute_xml = parser.to_xml(data=execute_data)
553
554     def stop(self):
555        for testbed in self._testbeds.values():
556            testbed.stop()
557        self._unpersist_testbed_proxies()
558    
559     def recover(self):
560         # reload perviously persisted testbed access configurations
561         self._load_testbed_proxies()
562
563         self._start(recover = True)
564
565     def is_finished(self, guid):
566         testbed = self._testbed_for_guid(guid)
567         if testbed != None:
568             return testbed.status(guid) == AS.STATUS_FINISHED
569         raise RuntimeError("No element exists with guid %d" % guid)    
570
571     def status(self, guid):
572         testbed = self._testbed_for_guid(guid)
573         if testbed != None:
574             return testbed.status(guid)
575         raise RuntimeError("No element exists with guid %d" % guid)    
576
577     def set(self, guid, name, value, time = TIME_NOW):
578         testbed = self._testbed_for_guid(guid)
579         if testbed != None:
580             testbed.set(guid, name, value, time)
581         else:
582             raise RuntimeError("No element exists with guid %d" % guid)    
583
584     def get(self, guid, name, time = TIME_NOW):
585         testbed = self._testbed_for_guid(guid)
586         if testbed != None:
587             return testbed.get(guid, name, time)
588         raise RuntimeError("No element exists with guid %d" % guid)    
589
590     def get_deferred(self, guid, name, time = TIME_NOW):
591         testbed = self._testbed_for_guid(guid)
592         if testbed != None:
593             return testbed.get_deferred(guid, name, time)
594         raise RuntimeError("No element exists with guid %d" % guid)    
595
596     def get_factory_id(self, guid):
597         testbed = self._testbed_for_guid(guid)
598         if testbed != None:
599             return testbed.get_factory_id(guid)
600         raise RuntimeError("No element exists with guid %d" % guid)    
601
602     def get_testbed_id(self, guid):
603         testbed = self._testbed_for_guid(guid)
604         if testbed != None:
605             return testbed.testbed_id
606         raise RuntimeError("No element exists with guid %d" % guid)    
607
608     def get_testbed_version(self, guid):
609         testbed = self._testbed_for_guid(guid)
610         if testbed != None:
611             return testbed.testbed_version
612         raise RuntimeError("No element exists with guid %d" % guid)    
613
614     def shutdown(self):
615         exceptions = list()
616         for testbed in self._testbeds.values():
617             try:
618                 testbed.shutdown()
619             except:
620                 exceptions.append(sys.exc_info())
621         for exc_info in exceptions:
622             raise exc_info[0], exc_info[1], exc_info[2]
623
624     def _testbed_for_guid(self, guid):
625         for testbed_guid in self._testbeds.keys():
626             if guid in self._guids_in_testbed(testbed_guid):
627                 return self._testbeds[testbed_guid]
628         return None
629
630     def _guids_in_testbed(self, testbed_guid):
631         if testbed_guid not in self._testbeds:
632             return set()
633         if testbed_guid not in self._guids_in_testbed_cache:
634             self._guids_in_testbed_cache[testbed_guid] = \
635                 set(self._testbeds[testbed_guid].guids)
636         return self._guids_in_testbed_cache[testbed_guid]
637
638     @staticmethod
639     def _netref_component_split(component):
640         match = COMPONENT_PATTERN.match(component)
641         if match:
642             return match.group("kind"), match.group("index")
643         else:
644             return component, None
645
646     _NETREF_COMPONENT_GETTERS = {
647         'addr':
648             lambda testbed, guid, index, name: 
649                 testbed.get_address(guid, int(index), name),
650         'route' :
651             lambda testbed, guid, index, name: 
652                 testbed.get_route(guid, int(index), name),
653         'trace' :
654             lambda testbed, guid, index, name: 
655                 testbed.trace(guid, index, name),
656         '' : 
657             lambda testbed, guid, index, name: 
658                 testbed.get(guid, name),
659     }
660     
661     def resolve_netref_value(self, value, failval = None):
662         match = ATTRIBUTE_PATTERN_BASE.search(value)
663         if match:
664             label = match.group("label")
665             if label.startswith('GUID-'):
666                 ref_guid = int(label[5:])
667                 if ref_guid:
668                     expr = match.group("expr")
669                     component = (match.group("component") or "")[1:] # skip the dot
670                     attribute = match.group("attribute")
671                     
672                     # split compound components into component kind and index
673                     # eg: 'addr[0]' -> ('addr', '0')
674                     component, component_index = self._netref_component_split(component)
675
676                     # find object and resolve expression
677                     for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
678                         if component not in self._NETREF_COMPONENT_GETTERS:
679                             raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
680                         elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
681                             pass
682                         else:
683                             ref_value = self._NETREF_COMPONENT_GETTERS[component](
684                                 ref_testbed, ref_guid, component_index, attribute)
685                             if ref_value:
686                                 return value.replace(match.group(), ref_value)
687         # couldn't find value
688         return failval
689     
690     def do_netrefs(self, data, fail_if_undefined = False):
691         # element netrefs
692         for (testbed_guid, guid), attrs in self._netrefs.items():
693             testbed = self._testbeds.get(testbed_guid)
694             if testbed is not None:
695                 for name in set(attrs):
696                     value = testbed.get(guid, name)
697                     if isinstance(value, basestring):
698                         ref_value = self.resolve_netref_value(value)
699                         if ref_value is not None:
700                             testbed.set(guid, name, ref_value)
701                             attrs.remove(name)
702                         elif fail_if_undefined:
703                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
704                 if not attrs:
705                     del self._netrefs[(testbed_guid, guid)]
706         
707         # testbed netrefs
708         for testbed_guid, attrs in self._testbed_netrefs.items():
709             tb_data = dict(data.get_attribute_data(testbed_guid))
710             if data:
711                 for name in set(attrs):
712                     value = tb_data.get(name)
713                     if isinstance(value, basestring):
714                         ref_value = self.resolve_netref_value(value)
715                         if ref_value is not None:
716                             data.set_attribute_data(testbed_guid, name, ref_value)
717                             attrs.remove(name)
718                         elif fail_if_undefined:
719                             raise ValueError, "Unresolvable netref in: %r" % (value,)
720                 if not attrs:
721                     del self._testbed_netrefs[testbed_guid]
722         
723
724     def _init_testbed_controllers(self, data, recover = False):
725         blacklist_testbeds = set(self._testbeds)
726         element_guids = list()
727         label_guids = dict()
728         data_guids = data.guids
729         to_recover = set()
730         to_restart = set()
731
732         # gather label associations
733         for guid in data_guids:
734             if not data.is_testbed_data(guid):
735                 (testbed_guid, factory_id) = data.get_box_data(guid)
736                 label = data.get_attribute_data(guid, "label")
737                 if label is not None:
738                     if label in label_guids:
739                         raise RuntimeError, "Label %r is not unique" % (label,)
740                     label_guids[label] = guid
741
742         # create testbed controllers
743         for guid in data_guids:
744             if data.is_testbed_data(guid):
745                 if guid not in self._testbeds:
746                     try:
747                         self._create_testbed_controller(
748                             guid, data, element_guids, recover)
749                         if recover:
750                             # Already programmed
751                             blacklist_testbeds.add(guid)
752                         else:
753                             to_restart.add(guid)
754                     except:
755                         if recover:
756                             policy = data.get_attribute_data(guid, DC.RECOVERY_POLICY)
757                             if policy == DC.POLICY_FAIL:
758                                 raise
759                             elif policy == DC.POLICY_RECOVER:
760                                 self._create_testbed_controller(
761                                     guid, data, element_guids, False)
762                                 to_recover.add(guid)
763                             elif policy == DC.POLICY_RESTART:
764                                 self._create_testbed_controller(
765                                     guid, data, element_guids, False)
766                                 to_restart.add(guid)
767                             else:
768                                 raise
769                         else:
770                             raise
771         
772         # queue programmable elements
773         #  - that have not been programmed already (blacklist_testbeds)
774         #  - including recovered or restarted testbeds
775         #  - but those that have no unresolved netrefs
776         for guid in data_guids:
777             if not data.is_testbed_data(guid):
778                 (testbed_guid, factory_id) = data.get_box_data(guid)
779                 if testbed_guid not in blacklist_testbeds:
780                     element_guids.append(guid)
781
782         # replace references to elements labels for its guid
783         self._resolve_labels(data, data_guids, label_guids)
784     
785         # program testbed controllers
786         if element_guids:
787             self._program_testbed_controllers(element_guids, data)
788         
789         return to_recover, to_restart
790
791     def _resolve_labels(self, data, data_guids, label_guids):
792         netrefs = self._netrefs
793         testbed_netrefs = self._testbed_netrefs
794         for guid in data_guids:
795             for name, value in data.get_attribute_data(guid):
796                 if isinstance(value, basestring):
797                     match = ATTRIBUTE_PATTERN_BASE.search(value)
798                     if match:
799                         label = match.group("label")
800                         if not label.startswith('GUID-'):
801                             ref_guid = label_guids.get(label)
802                             if ref_guid is not None:
803                                 value = ATTRIBUTE_PATTERN_BASE.sub(
804                                     ATTRIBUTE_PATTERN_GUID_SUB % dict(
805                                         guid = 'GUID-%d' % (ref_guid,),
806                                         expr = match.group("expr"),
807                                         label = label), 
808                                     value)
809                                 data.set_attribute_data(guid, name, value)
810                                 
811                                 # memorize which guid-attribute pairs require
812                                 # postprocessing, to avoid excessive controller-testbed
813                                 # communication at configuration time
814                                 # (which could require high-latency network I/O)
815                                 if not data.is_testbed_data(guid):
816                                     (testbed_guid, factory_id) = data.get_box_data(guid)
817                                     netrefs[(testbed_guid, guid)].add(name)
818                                 else:
819                                     testbed_netrefs[guid].add(name)
820
821     def _create_testbed_controller(self, guid, data, element_guids, recover):
822         (testbed_id, testbed_version) = data.get_testbed_data(guid)
823         deployment_config = self._deployment_config.get(guid)
824         
825         # deferred import because proxy needs
826         # our class definitions to define proxies
827         import nepi.util.proxy as proxy
828         
829         if deployment_config is None:
830             # need to create one
831             deployment_config = proxy.AccessConfiguration()
832             
833             for (name, value) in data.get_attribute_data(guid):
834                 if value is not None and deployment_config.has_attribute(name):
835                     # if any deployment config attribute has a netref, we can't
836                     # create this controller yet
837                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
838                         # remember to re-issue this one
839                         self._netreffed_testbeds.add(guid)
840                         return
841                     
842                     # copy deployment config attribute
843                     deployment_config.set_attribute_value(name, value)
844             
845             # commit config
846             self._deployment_config[guid] = deployment_config
847         
848         if deployment_config is not None:
849             # force recovery mode 
850             deployment_config.set_attribute_value("recover",recover)
851         
852         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
853                 deployment_config)
854         for (name, value) in data.get_attribute_data(guid):
855             testbed.defer_configure(name, value)
856         self._testbeds[guid] = testbed
857         if guid in self._netreffed_testbeds:
858             self._netreffed_testbeds.remove(guid)
859
860     def _program_testbed_controllers(self, element_guids, data):
861         def resolve_create_netref(data, guid, name, value): 
862             # Try to resolve create-time netrefs, if possible
863             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
864                 try:
865                     nuvalue = self.resolve_netref_value(value)
866                 except:
867                     # Any trouble means we're not in shape to resolve the netref yet
868                     nuvalue = None
869                 if nuvalue is not None:
870                     # Only if we succeed we remove the netref deferral entry
871                     value = nuvalue
872                     data.set_attribute_data(guid, name, value)
873                     if (testbed_guid, guid) in self._netrefs:
874                         self._netrefs[(testbed_guid, guid)].discard(name)
875             return value
876
877         for guid in element_guids:
878             (testbed_guid, factory_id) = data.get_box_data(guid)
879             testbed = self._testbeds.get(testbed_guid)
880             if testbed is not None:
881                 # create
882                 testbed.defer_create(guid, factory_id)
883                 # set attributes
884                 for (name, value) in data.get_attribute_data(guid):
885                     value = resolve_create_netref(data, guid, name, value)
886                     testbed.defer_create_set(guid, name, value)
887
888         for guid in element_guids:
889             (testbed_guid, factory_id) = data.get_box_data(guid)
890             testbed = self._testbeds.get(testbed_guid)
891             if testbed is not None:
892                 # traces
893                 for trace_id in data.get_trace_data(guid):
894                     testbed.defer_add_trace(guid, trace_id)
895                 # addresses
896                 for (address, netprefix, broadcast) in data.get_address_data(guid):
897                     if address != None:
898                         testbed.defer_add_address(guid, address, netprefix, 
899                                 broadcast)
900                 # routes
901                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
902                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
903                 # store connections data
904                 for (connector_type_name, other_guid, other_connector_type_name) \
905                         in data.get_connection_data(guid):
906                     (other_testbed_guid, other_factory_id) = data.get_box_data(
907                             other_guid)
908                     if testbed_guid == other_testbed_guid:
909                         # each testbed should take care of enforcing internal
910                         # connection simmetry, so each connection is only
911                         # added in one direction
912                         testbed.defer_connect(guid, connector_type_name, 
913                                 other_guid, other_connector_type_name)
914
915     def _program_testbed_cross_connections(self, data):
916         data_guids = data.guids
917         for guid in data_guids: 
918             if not data.is_testbed_data(guid):
919                 (testbed_guid, factory_id) = data.get_box_data(guid)
920                 testbed = self._testbeds.get(testbed_guid)
921                 if testbed is not None:
922                     for (connector_type_name, cross_guid, cross_connector_type_name) \
923                             in data.get_connection_data(guid):
924                         (testbed_guid, factory_id) = data.get_box_data(guid)
925                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
926                                 cross_guid)
927                         if testbed_guid != cross_testbed_guid:
928                             cross_testbed = self._testbeds[cross_testbed_guid]
929                             cross_testbed_id = cross_testbed.testbed_id
930                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
931                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
932                                     cross_connector_type_name)
933                             # save cross data for later
934                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
935                                     cross_guid)
936
937     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
938         if testbed_guid not in self._cross_data:
939             self._cross_data[testbed_guid] = dict()
940         if cross_testbed_guid not in self._cross_data[testbed_guid]:
941             self._cross_data[testbed_guid][cross_testbed_guid] = set()
942         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
943
944     def _get_cross_data(self, testbed_guid):
945         cross_data = dict()
946         if not testbed_guid in self._cross_data:
947             return cross_data
948
949         # fetch attribute lists in one batch
950         attribute_lists = dict()
951         for cross_testbed_guid, guid_list in \
952                 self._cross_data[testbed_guid].iteritems():
953             cross_testbed = self._testbeds[cross_testbed_guid]
954             for cross_guid in guid_list:
955                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
956                     cross_testbed.get_attribute_list_deferred(cross_guid)
957
958         # fetch attribute values in another batch
959         for cross_testbed_guid, guid_list in \
960                 self._cross_data[testbed_guid].iteritems():
961             cross_data[cross_testbed_guid] = dict()
962             cross_testbed = self._testbeds[cross_testbed_guid]
963             for cross_guid in guid_list:
964                 elem_cross_data = dict(
965                     _guid = cross_guid,
966                     _testbed_guid = cross_testbed_guid,
967                     _testbed_id = cross_testbed.testbed_id,
968                     _testbed_version = cross_testbed.testbed_version)
969                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
970                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
971                 for attr_name in attribute_list:
972                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
973                     elem_cross_data[attr_name] = attr_value
974         
975         # undefer all values - we'll have to serialize them probably later
976         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
977             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
978                 for attr_name, attr_value in elem_cross_data.iteritems():
979                     elem_cross_data[attr_name] = _undefer(attr_value)
980         
981         return cross_data
982