Recovery policy for testbeds, and recovery implementation in PlanetLab.
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
19
20 def _undefer(deferred):
21     if hasattr(deferred, '_get'):
22         return deferred._get()
23     else:
24         return deferred
25
26
27 class TestbedController(object):
28     def __init__(self, testbed_id, testbed_version):
29         self._testbed_id = testbed_id
30         self._testbed_version = testbed_version
31
32     @property
33     def testbed_id(self):
34         return self._testbed_id
35
36     @property
37     def testbed_version(self):
38         return self._testbed_version
39
40     @property
41     def guids(self):
42         raise NotImplementedError
43
44     def defer_configure(self, name, value):
45         """Instructs setting a configuartion attribute for the testbed instance"""
46         raise NotImplementedError
47
48     def defer_create(self, guid, factory_id):
49         """Instructs creation of element """
50         raise NotImplementedError
51
52     def defer_create_set(self, guid, name, value):
53         """Instructs setting an initial attribute on an element"""
54         raise NotImplementedError
55
56     def defer_factory_set(self, guid, name, value):
57         """Instructs setting an attribute on a factory"""
58         raise NotImplementedError
59
60     def defer_connect(self, guid1, connector_type_name1, guid2, 
61             connector_type_name2): 
62         """Instructs creation of a connection between the given connectors"""
63         raise NotImplementedError
64
65     def defer_cross_connect(self, 
66             guid, connector_type_name,
67             cross_guid, cross_testbed_guid,
68             cross_testbed_id, cross_factory_id,
69             cross_connector_type_name):
70         """
71         Instructs creation of a connection between the given connectors 
72         of different testbed instances
73         """
74         raise NotImplementedError
75
76     def defer_add_trace(self, guid, trace_id):
77         """Instructs the addition of a trace"""
78         raise NotImplementedError
79
80     def defer_add_address(self, guid, address, netprefix, broadcast): 
81         """Instructs the addition of an address"""
82         raise NotImplementedError
83
84     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
85         """Instructs the addition of a route"""
86         raise NotImplementedError
87
88     def do_setup(self):
89         """After do_setup the testbed initial configuration is done"""
90         raise NotImplementedError
91
92     def do_create(self):
93         """
94         After do_create all instructed elements are created and 
95         attributes setted
96         """
97         raise NotImplementedError
98
99     def do_connect_init(self):
100         """
101         After do_connect_init all internal connections between testbed elements
102         are initiated
103         """
104         raise NotImplementedError
105
106     def do_connect_compl(self):
107         """
108         After do_connect all internal connections between testbed elements
109         are completed
110         """
111         raise NotImplementedError
112
113     def do_preconfigure(self):
114         """
115         Done just before resolving netrefs, after connection, before cross connections,
116         useful for early stages of configuration, for setting up stuff that might be
117         required for netref resolution.
118         """
119         raise NotImplementedError
120
121     def do_configure(self):
122         """After do_configure elements are configured"""
123         raise NotImplementedError
124
125     def do_prestart(self):
126         """Before do_start elements are prestart-configured"""
127         raise NotImplementedError
128
129     def do_cross_connect_init(self, cross_data):
130         """
131         After do_cross_connect_init initiation of all external connections 
132         between different testbed elements is performed
133         """
134         raise NotImplementedError
135
136     def do_cross_connect_compl(self, cross_data):
137         """
138         After do_cross_connect_compl completion of all external connections 
139         between different testbed elements is performed
140         """
141         raise NotImplementedError
142
143     def start(self):
144         raise NotImplementedError
145
146     def stop(self):
147         raise NotImplementedError
148
149     def recover(self):
150         raise NotImplementedError
151
152     def set(self, guid, name, value, time = TIME_NOW):
153         raise NotImplementedError
154
155     def get(self, guid, name, time = TIME_NOW):
156         raise NotImplementedError
157     
158     def get_route(self, guid, index, attribute):
159         """
160         Params:
161             
162             guid: guid of box to query
163             index: number of routing entry to fetch
164             attribute: one of Destination, NextHop, NetPrefix
165         """
166         raise NotImplementedError
167
168     def get_address(self, guid, index, attribute='Address'):
169         """
170         Params:
171             
172             guid: guid of box to query
173             index: number of inteface to select
174             attribute: one of Address, NetPrefix, Broadcast
175         """
176         raise NotImplementedError
177
178     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
179         raise NotImplementedError
180
181     def get_factory_id(self, guid):
182         raise NotImplementedError
183
184     def action(self, time, guid, action):
185         raise NotImplementedError
186
187     def status(self, guid):
188         raise NotImplementedError
189
190     def trace(self, guid, trace_id, attribute='value'):
191         raise NotImplementedError
192
193     def traces_info(self):
194         """ dictionary of dictionaries:
195             traces_info = dict({
196                 guid = dict({
197                     trace_id = dict({
198                             host = host,
199                             filepath = filepath,
200                             filesize = size in bytes,
201                         })
202                 })
203             })"""
204         raise NotImplementedError
205
206     def shutdown(self):
207         raise NotImplementedError
208
209 class ExperimentController(object):
210     def __init__(self, experiment_xml, root_dir):
211         self._experiment_design_xml = experiment_xml
212         self._experiment_execute_xml = None
213         self._testbeds = dict()
214         self._deployment_config = dict()
215         self._netrefs = collections.defaultdict(set)
216         self._testbed_netrefs = collections.defaultdict(set)
217         self._cross_data = dict()
218         self._root_dir = root_dir
219         self._netreffed_testbeds = set()
220         self._guids_in_testbed_cache = dict()
221         
222         if experiment_xml is None and root_dir is not None:
223             # Recover
224             self.load_experiment_xml()
225             self.load_execute_xml()
226         else:
227             self.persist_experiment_xml()
228
229     @property
230     def experiment_design_xml(self):
231         return self._experiment_design_xml
232
233     @property
234     def experiment_execute_xml(self):
235         return self._experiment_execute_xml
236
237     @property
238     def guids(self):
239         guids = list()
240         for testbed_guid in self._testbeds.keys():
241             _guids = self._guids_in_testbed(testbed_guid)
242             if _guids:
243                 guids.extend(_guids)
244         return guids
245
246     def persist_experiment_xml(self):
247         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
248         f = open(xml_path, "w")
249         f.write(self._experiment_design_xml)
250         f.close()
251
252     def persist_execute_xml(self):
253         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
254         f = open(xml_path, "w")
255         f.write(self._experiment_execute_xml)
256         f.close()
257
258     def load_experiment_xml(self):
259         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
260         f = open(xml_path, "r")
261         self._experiment_design_xml = f.read()
262         f.close()
263
264     def load_execute_xml(self):
265         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
266         f = open(xml_path, "r")
267         self._experiment_execute_xml = f.read()
268         f.close()
269
270     def trace(self, guid, trace_id, attribute='value'):
271         testbed = self._testbed_for_guid(guid)
272         if testbed != None:
273             return testbed.trace(guid, trace_id, attribute)
274         raise RuntimeError("No element exists with guid %d" % guid)    
275
276     def traces_info(self):
277         traces_info = dict()
278         for guid, testbed in self._testbeds.iteritems():
279             tinfo = testbed.traces_info()
280             if tinfo:
281                 traces_info[guid] = testbed.traces_info()
282         return traces_info
283
284     @staticmethod
285     def _parallel(callables):
286         excs = []
287         def wrap(callable):
288             @functools.wraps(callable)
289             def wrapped(*p, **kw):
290                 try:
291                     callable(*p, **kw)
292                 except:
293                     import traceback
294                     traceback.print_exc(file=sys.stderr)
295                     excs.append(sys.exc_info())
296             return wrapped
297         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
298         for thread in threads:
299             thread.start()
300         for thread in threads:
301             thread.join()
302         for exc in excs:
303             eTyp, eVal, eLoc = exc
304             raise eTyp, eVal, eLoc
305
306     def start(self):
307         self._start()
308
309     def _start(self, recover = False):
310         parser = XmlExperimentParser()
311         
312         if recover:
313             xml = self._experiment_execute_xml
314         else:
315             xml = self._experiment_design_xml
316         data = parser.from_xml_to_data(xml)
317
318         # instantiate testbed controllers
319         to_recover, to_restart = self._init_testbed_controllers(data, recover)
320         all_restart = set(to_restart)
321         
322         if not recover:
323             # persist testbed connection data, for potential recovery
324             self._persist_testbed_proxies()
325         else:
326             # recover recoverable controllers
327             for guid in to_recover:
328                 self._testbeds[guid].do_setup()
329                 self._testbeds[guid].recover()
330         
331         def steps_to_configure(self, allowed_guids):
332             # perform setup in parallel for all test beds,
333             # wait for all threads to finish
334             self._parallel([testbed.do_setup 
335                             for guid,testbed in self._testbeds.iteritems()
336                             if guid in allowed_guids])
337        
338             # perform create-connect in parallel, wait
339             # (internal connections only)
340             self._parallel([testbed.do_create
341                             for guid,testbed in self._testbeds.iteritems()
342                             if guid in allowed_guids])
343
344             self._parallel([testbed.do_connect_init
345                             for guid,testbed in self._testbeds.iteritems()
346                             if guid in allowed_guids])
347
348             self._parallel([testbed.do_connect_compl
349                             for guid,testbed in self._testbeds.iteritems()
350                             if guid in allowed_guids])
351
352             self._parallel([testbed.do_preconfigure
353                             for guid,testbed in self._testbeds.iteritems()
354                             if guid in allowed_guids])
355             self._clear_caches()
356
357         steps_to_configure(self, to_restart)
358
359         if self._netreffed_testbeds:
360             # initally resolve netrefs
361             self.do_netrefs(data, fail_if_undefined=False)
362             
363             # rinse and repeat, for netreffed testbeds
364             netreffed_testbeds = set(self._netreffed_testbeds)
365
366             to_recover, to_restart = self._init_testbed_controllers(data, recover)
367             all_restart.update(to_restart)
368             
369             if not recover:
370                 # persist testbed connection data, for potential recovery
371                 self._persist_testbed_proxies()
372             else:
373                 # recover recoverable controllers
374                 for guid in to_recover:
375                     self._testbeds[guid].do_setup()
376                     self._testbeds[guid].recover()
377
378             # configure dependant testbeds
379             steps_to_configure(self, to_restart)
380         
381         all_restart = [ self._testbeds[guid] for guid in all_restart ]
382             
383         # final netref step, fail if anything's left unresolved
384         self.do_netrefs(data, fail_if_undefined=True)
385        
386         # Only now, that netref dependencies have been solve, it is safe to
387         # program cross_connections
388         self._program_testbed_cross_connections(data)
389  
390         # perform do_configure in parallel for al testbeds
391         # (it's internal configuration for each)
392         self._parallel([testbed.do_configure
393                         for testbed in all_restart])
394
395         self._clear_caches()
396
397         #print >>sys.stderr, "DO IT"
398         #import time
399         #time.sleep(60)
400         
401         # cross-connect (cannot be done in parallel)
402         for guid, testbed in self._testbeds.iteritems():
403             cross_data = self._get_cross_data(guid)
404             testbed.do_cross_connect_init(cross_data)
405         for guid, testbed in self._testbeds.iteritems():
406             cross_data = self._get_cross_data(guid)
407             testbed.do_cross_connect_compl(cross_data)
408        
409         self._clear_caches()
410
411         # Last chance to configure (parallel on all testbeds)
412         self._parallel([testbed.do_prestart
413                         for testbed in all_restart])
414
415         self._clear_caches()
416         
417         if not recover:
418             # update execution xml with execution-specific values
419             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
420             self._update_execute_xml()
421             self.persist_execute_xml()
422
423         # start experiment (parallel start on all testbeds)
424         self._parallel([testbed.start
425                         for testbed in all_restart])
426
427         self._clear_caches()
428
429     def _clear_caches(self):
430         # Cleaning cache for safety.
431         self._guids_in_testbed_cache = dict()
432
433     def _persist_testbed_proxies(self):
434         TRANSIENT = (DC.RECOVER,)
435         
436         # persist access configuration for all testbeds, so that
437         # recovery mode can reconnect to them if it becomes necessary
438         conf = ConfigParser.RawConfigParser()
439         for testbed_guid, testbed_config in self._deployment_config.iteritems():
440             testbed_guid = str(testbed_guid)
441             conf.add_section(testbed_guid)
442             for attr in testbed_config.get_attribute_list():
443                 if attr not in TRANSIENT:
444                     conf.set(testbed_guid, attr, 
445                         testbed_config.get_attribute_value(attr))
446         
447         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
448         conf.write(f)
449         f.close()
450     
451     def _load_testbed_proxies(self):
452         TYPEMAP = {
453             Attribute.STRING : 'get',
454             Attribute.BOOL : 'getboolean',
455             Attribute.ENUM : 'get',
456             Attribute.DOUBLE : 'getfloat',
457             Attribute.INTEGER : 'getint',
458         }
459         
460         TRANSIENT = (DC.RECOVER,)
461         
462         # deferred import because proxy needs
463         # our class definitions to define proxies
464         import nepi.util.proxy as proxy
465         
466         conf = ConfigParser.RawConfigParser()
467         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
468         for testbed_guid in conf.sections():
469             testbed_config = proxy.AccessConfiguration()
470             testbed_guid = str(testbed_guid)
471             for attr in testbed_config.get_attribute_list():
472                 if attr not in TRANSIENT:
473                     getter = getattr(conf, TYPEMAP.get(
474                         testbed_config.get_attribute_type(attr),
475                         'get') )
476                     testbed_config.set_attribute_value(
477                         attr, getter(testbed_guid, attr))
478     
479     def _unpersist_testbed_proxies(self):
480         try:
481             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
482         except:
483             # Just print exceptions, this is just cleanup
484             import traceback
485             ######## BUG ##########
486             #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
487             #traceback.print_exc(file=sys.stderr)
488
489     def _update_execute_xml(self):
490         # For all testbeds,
491         #   For all elements in testbed,
492         #       - gather immutable execute-readable attribuets lists
493         #         asynchronously
494         # Generate new design description from design xml
495         # (Wait for attributes lists - implicit syncpoint)
496         # For all testbeds,
497         #   For all elements in testbed,
498         #       - gather all immutable execute-readable attribute
499         #         values, asynchronously
500         # (Wait for attribute values - implicit syncpoint)
501         # For all testbeds,
502         #   For all elements in testbed,
503         #       - inject non-None values into new design
504         # Generate execute xml from new design
505
506         attribute_lists = dict(
507             (testbed_guid, collections.defaultdict(dict))
508             for testbed_guid in self._testbeds
509         )
510         
511         for testbed_guid, testbed in self._testbeds.iteritems():
512             guids = self._guids_in_testbed(testbed_guid)
513             for guid in guids:
514                 attribute_lists[testbed_guid][guid] = \
515                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
516         
517         parser = XmlExperimentParser()
518         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
519
520         attribute_values = dict(
521             (testbed_guid, collections.defaultdict(dict))
522             for testbed_guid in self._testbeds
523         )
524         
525         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
526             testbed = self._testbeds[testbed_guid]
527             for guid, attribute_list in testbed_attribute_lists.iteritems():
528                 attribute_list = _undefer(attribute_list)
529                 attribute_values[testbed_guid][guid] = dict(
530                     (attribute, testbed.get_deferred(guid, attribute))
531                     for attribute in attribute_list
532                 )
533         
534         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
535             for guid, attribute_values in testbed_attribute_values.iteritems():
536                 for attribute, value in attribute_values.iteritems():
537                     value = _undefer(value)
538                     if value is not None:
539                         execute_data.add_attribute_data(guid, attribute, value)
540         
541         self._experiment_execute_xml = parser.to_xml(data=execute_data)
542
543     def stop(self):
544        for testbed in self._testbeds.values():
545            testbed.stop()
546        self._unpersist_testbed_proxies()
547    
548     def recover(self):
549         # reload perviously persisted testbed access configurations
550         self._load_testbed_proxies()
551
552         self._start(recover = True)
553
554     def is_finished(self, guid):
555         testbed = self._testbed_for_guid(guid)
556         if testbed != None:
557             return testbed.status(guid) == AS.STATUS_FINISHED
558         raise RuntimeError("No element exists with guid %d" % guid)    
559
560     def status(self, guid):
561         testbed = self._testbed_for_guid(guid)
562         if testbed != None:
563             return testbed.status(guid)
564         raise RuntimeError("No element exists with guid %d" % guid)    
565
566     def set(self, guid, name, value, time = TIME_NOW):
567         testbed = self._testbed_for_guid(guid)
568         if testbed != None:
569             testbed.set(guid, name, value, time)
570         else:
571             raise RuntimeError("No element exists with guid %d" % guid)    
572
573     def get(self, guid, name, time = TIME_NOW):
574         testbed = self._testbed_for_guid(guid)
575         if testbed != None:
576             return testbed.get(guid, name, time)
577         raise RuntimeError("No element exists with guid %d" % guid)    
578
579     def get_deferred(self, guid, name, time = TIME_NOW):
580         testbed = self._testbed_for_guid(guid)
581         if testbed != None:
582             return testbed.get_deferred(guid, name, time)
583         raise RuntimeError("No element exists with guid %d" % guid)    
584
585     def get_factory_id(self, guid):
586         testbed = self._testbed_for_guid(guid)
587         if testbed != None:
588             return testbed.get_factory_id(guid)
589         raise RuntimeError("No element exists with guid %d" % guid)    
590
591     def get_testbed_id(self, guid):
592         testbed = self._testbed_for_guid(guid)
593         if testbed != None:
594             return testbed.testbed_id
595         raise RuntimeError("No element exists with guid %d" % guid)    
596
597     def get_testbed_version(self, guid):
598         testbed = self._testbed_for_guid(guid)
599         if testbed != None:
600             return testbed.testbed_version
601         raise RuntimeError("No element exists with guid %d" % guid)    
602
603     def shutdown(self):
604         exceptions = list()
605         for testbed in self._testbeds.values():
606             try:
607                 testbed.shutdown()
608             except:
609                 exceptions.append(sys.exc_info())
610         for exc_info in exceptions:
611             raise exc_info[0], exc_info[1], exc_info[2]
612
613     def _testbed_for_guid(self, guid):
614         for testbed_guid in self._testbeds.keys():
615             if guid in self._guids_in_testbed(testbed_guid):
616                 return self._testbeds[testbed_guid]
617         return None
618
619     def _guids_in_testbed(self, testbed_guid):
620         if testbed_guid not in self._testbeds:
621             return set()
622         if testbed_guid not in self._guids_in_testbed_cache:
623             self._guids_in_testbed_cache[testbed_guid] = \
624                 set(self._testbeds[testbed_guid].guids)
625         return self._guids_in_testbed_cache[testbed_guid]
626
627     @staticmethod
628     def _netref_component_split(component):
629         match = COMPONENT_PATTERN.match(component)
630         if match:
631             return match.group("kind"), match.group("index")
632         else:
633             return component, None
634
635     _NETREF_COMPONENT_GETTERS = {
636         'addr':
637             lambda testbed, guid, index, name: 
638                 testbed.get_address(guid, int(index), name),
639         'route' :
640             lambda testbed, guid, index, name: 
641                 testbed.get_route(guid, int(index), name),
642         'trace' :
643             lambda testbed, guid, index, name: 
644                 testbed.trace(guid, index, name),
645         '' : 
646             lambda testbed, guid, index, name: 
647                 testbed.get(guid, name),
648     }
649     
650     def resolve_netref_value(self, value, failval = None):
651         match = ATTRIBUTE_PATTERN_BASE.search(value)
652         if match:
653             label = match.group("label")
654             if label.startswith('GUID-'):
655                 ref_guid = int(label[5:])
656                 if ref_guid:
657                     expr = match.group("expr")
658                     component = (match.group("component") or "")[1:] # skip the dot
659                     attribute = match.group("attribute")
660                     
661                     # split compound components into component kind and index
662                     # eg: 'addr[0]' -> ('addr', '0')
663                     component, component_index = self._netref_component_split(component)
664
665                     # find object and resolve expression
666                     for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
667                         if component not in self._NETREF_COMPONENT_GETTERS:
668                             raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
669                         elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
670                             pass
671                         else:
672                             ref_value = self._NETREF_COMPONENT_GETTERS[component](
673                                 ref_testbed, ref_guid, component_index, attribute)
674                             if ref_value:
675                                 return value.replace(match.group(), ref_value)
676         # couldn't find value
677         return failval
678     
679     def do_netrefs(self, data, fail_if_undefined = False):
680         # element netrefs
681         for (testbed_guid, guid), attrs in self._netrefs.items():
682             testbed = self._testbeds.get(testbed_guid)
683             if testbed is not None:
684                 for name in set(attrs):
685                     value = testbed.get(guid, name)
686                     if isinstance(value, basestring):
687                         ref_value = self.resolve_netref_value(value)
688                         if ref_value is not None:
689                             testbed.set(guid, name, ref_value)
690                             attrs.remove(name)
691                         elif fail_if_undefined:
692                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
693                 if not attrs:
694                     del self._netrefs[(testbed_guid, guid)]
695         
696         # testbed netrefs
697         for testbed_guid, attrs in self._testbed_netrefs.items():
698             tb_data = dict(data.get_attribute_data(testbed_guid))
699             if data:
700                 for name in set(attrs):
701                     value = tb_data.get(name)
702                     if isinstance(value, basestring):
703                         ref_value = self.resolve_netref_value(value)
704                         if ref_value is not None:
705                             data.set_attribute_data(testbed_guid, name, ref_value)
706                             attrs.remove(name)
707                         elif fail_if_undefined:
708                             raise ValueError, "Unresolvable netref in: %r" % (value,)
709                 if not attrs:
710                     del self._testbed_netrefs[testbed_guid]
711         
712
713     def _init_testbed_controllers(self, data, recover = False):
714         blacklist_testbeds = set(self._testbeds)
715         element_guids = list()
716         label_guids = dict()
717         data_guids = data.guids
718         to_recover = set()
719         to_restart = set()
720
721         # gather label associations
722         for guid in data_guids:
723             if not data.is_testbed_data(guid):
724                 (testbed_guid, factory_id) = data.get_box_data(guid)
725                 label = data.get_attribute_data(guid, "label")
726                 if label is not None:
727                     if label in label_guids:
728                         raise RuntimeError, "Label %r is not unique" % (label,)
729                     label_guids[label] = guid
730
731         # create testbed controllers
732         for guid in data_guids:
733             if data.is_testbed_data(guid):
734                 if guid not in self._testbeds:
735                     try:
736                         self._create_testbed_controller(
737                             guid, data, element_guids, recover)
738                         if recover:
739                             # Already programmed
740                             blacklist_testbeds.add(guid)
741                         else:
742                             to_restart.add(guid)
743                     except:
744                         if recover:
745                             policy = data.get_attribute_data(guid, DC.RECOVERY_POLICY)
746                             if policy == DC.POLICY_FAIL:
747                                 raise
748                             elif policy == DC.POLICY_RECOVER:
749                                 self._create_testbed_controller(
750                                     guid, data, element_guids, False)
751                                 to_recover.add(guid)
752                             elif policy == DC.POLICY_RESTART:
753                                 self._create_testbed_controller(
754                                     guid, data, element_guids, False)
755                                 to_restart.add(guid)
756                             else:
757                                 raise
758                         else:
759                             raise
760         
761         # queue programmable elements
762         #  - that have not been programmed already (blacklist_testbeds)
763         #  - including recovered or restarted testbeds
764         #  - but those that have no unresolved netrefs
765         for guid in data_guids:
766             if not data.is_testbed_data(guid):
767                 (testbed_guid, factory_id) = data.get_box_data(guid)
768                 if testbed_guid not in blacklist_testbeds:
769                     element_guids.append(guid)
770
771         # replace references to elements labels for its guid
772         self._resolve_labels(data, data_guids, label_guids)
773     
774         # program testbed controllers
775         if element_guids:
776             self._program_testbed_controllers(element_guids, data)
777         
778         return to_recover, to_restart
779
780     def _resolve_labels(self, data, data_guids, label_guids):
781         netrefs = self._netrefs
782         testbed_netrefs = self._testbed_netrefs
783         for guid in data_guids:
784             for name, value in data.get_attribute_data(guid):
785                 if isinstance(value, basestring):
786                     match = ATTRIBUTE_PATTERN_BASE.search(value)
787                     if match:
788                         label = match.group("label")
789                         if not label.startswith('GUID-'):
790                             ref_guid = label_guids.get(label)
791                             if ref_guid is not None:
792                                 value = ATTRIBUTE_PATTERN_BASE.sub(
793                                     ATTRIBUTE_PATTERN_GUID_SUB % dict(
794                                         guid = 'GUID-%d' % (ref_guid,),
795                                         expr = match.group("expr"),
796                                         label = label), 
797                                     value)
798                                 data.set_attribute_data(guid, name, value)
799                                 
800                                 # memorize which guid-attribute pairs require
801                                 # postprocessing, to avoid excessive controller-testbed
802                                 # communication at configuration time
803                                 # (which could require high-latency network I/O)
804                                 if not data.is_testbed_data(guid):
805                                     (testbed_guid, factory_id) = data.get_box_data(guid)
806                                     netrefs[(testbed_guid, guid)].add(name)
807                                 else:
808                                     testbed_netrefs[guid].add(name)
809
810     def _create_testbed_controller(self, guid, data, element_guids, recover):
811         (testbed_id, testbed_version) = data.get_testbed_data(guid)
812         deployment_config = self._deployment_config.get(guid)
813         
814         # deferred import because proxy needs
815         # our class definitions to define proxies
816         import nepi.util.proxy as proxy
817         
818         if deployment_config is None:
819             # need to create one
820             deployment_config = proxy.AccessConfiguration()
821             
822             for (name, value) in data.get_attribute_data(guid):
823                 if value is not None and deployment_config.has_attribute(name):
824                     # if any deployment config attribute has a netref, we can't
825                     # create this controller yet
826                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
827                         # remember to re-issue this one
828                         self._netreffed_testbeds.add(guid)
829                         return
830                     
831                     # copy deployment config attribute
832                     deployment_config.set_attribute_value(name, value)
833             
834             # commit config
835             self._deployment_config[guid] = deployment_config
836         
837         if deployment_config is not None:
838             # force recovery mode 
839             deployment_config.set_attribute_value("recover",recover)
840         
841         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
842                 deployment_config)
843         for (name, value) in data.get_attribute_data(guid):
844             testbed.defer_configure(name, value)
845         self._testbeds[guid] = testbed
846         if guid in self._netreffed_testbeds:
847             self._netreffed_testbeds.remove(guid)
848
849     def _program_testbed_controllers(self, element_guids, data):
850         def resolve_create_netref(data, guid, name, value): 
851             # Try to resolve create-time netrefs, if possible
852             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
853                 try:
854                     nuvalue = self.resolve_netref_value(value)
855                 except:
856                     # Any trouble means we're not in shape to resolve the netref yet
857                     nuvalue = None
858                 if nuvalue is not None:
859                     # Only if we succeed we remove the netref deferral entry
860                     value = nuvalue
861                     data.set_attribute_data(guid, name, value)
862                     if (testbed_guid, guid) in self._netrefs:
863                         self._netrefs[(testbed_guid, guid)].discard(name)
864             return value
865
866         for guid in element_guids:
867             (testbed_guid, factory_id) = data.get_box_data(guid)
868             testbed = self._testbeds.get(testbed_guid)
869             if testbed is not None:
870                 # create
871                 testbed.defer_create(guid, factory_id)
872                 # set attributes
873                 for (name, value) in data.get_attribute_data(guid):
874                     value = resolve_create_netref(data, guid, name, value)
875                     testbed.defer_create_set(guid, name, value)
876
877         for guid in element_guids:
878             (testbed_guid, factory_id) = data.get_box_data(guid)
879             testbed = self._testbeds.get(testbed_guid)
880             if testbed is not None:
881                 # traces
882                 for trace_id in data.get_trace_data(guid):
883                     testbed.defer_add_trace(guid, trace_id)
884                 # addresses
885                 for (address, netprefix, broadcast) in data.get_address_data(guid):
886                     if address != None:
887                         testbed.defer_add_address(guid, address, netprefix, 
888                                 broadcast)
889                 # routes
890                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
891                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
892                 # store connections data
893                 for (connector_type_name, other_guid, other_connector_type_name) \
894                         in data.get_connection_data(guid):
895                     (other_testbed_guid, other_factory_id) = data.get_box_data(
896                             other_guid)
897                     if testbed_guid == other_testbed_guid:
898                         # each testbed should take care of enforcing internal
899                         # connection simmetry, so each connection is only
900                         # added in one direction
901                         testbed.defer_connect(guid, connector_type_name, 
902                                 other_guid, other_connector_type_name)
903
904     def _program_testbed_cross_connections(self, data):
905         data_guids = data.guids
906         for guid in data_guids: 
907             if not data.is_testbed_data(guid):
908                 (testbed_guid, factory_id) = data.get_box_data(guid)
909                 testbed = self._testbeds.get(testbed_guid)
910                 if testbed is not None:
911                     for (connector_type_name, cross_guid, cross_connector_type_name) \
912                             in data.get_connection_data(guid):
913                         (testbed_guid, factory_id) = data.get_box_data(guid)
914                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
915                                 cross_guid)
916                         if testbed_guid != cross_testbed_guid:
917                             cross_testbed = self._testbeds[cross_testbed_guid]
918                             cross_testbed_id = cross_testbed.testbed_id
919                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
920                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
921                                     cross_connector_type_name)
922                             # save cross data for later
923                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
924                                     cross_guid)
925
926     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
927         if testbed_guid not in self._cross_data:
928             self._cross_data[testbed_guid] = dict()
929         if cross_testbed_guid not in self._cross_data[testbed_guid]:
930             self._cross_data[testbed_guid][cross_testbed_guid] = set()
931         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
932
933     def _get_cross_data(self, testbed_guid):
934         cross_data = dict()
935         if not testbed_guid in self._cross_data:
936             return cross_data
937
938         # fetch attribute lists in one batch
939         attribute_lists = dict()
940         for cross_testbed_guid, guid_list in \
941                 self._cross_data[testbed_guid].iteritems():
942             cross_testbed = self._testbeds[cross_testbed_guid]
943             for cross_guid in guid_list:
944                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
945                     cross_testbed.get_attribute_list_deferred(cross_guid)
946
947         # fetch attribute values in another batch
948         for cross_testbed_guid, guid_list in \
949                 self._cross_data[testbed_guid].iteritems():
950             cross_data[cross_testbed_guid] = dict()
951             cross_testbed = self._testbeds[cross_testbed_guid]
952             for cross_guid in guid_list:
953                 elem_cross_data = dict(
954                     _guid = cross_guid,
955                     _testbed_guid = cross_testbed_guid,
956                     _testbed_id = cross_testbed.testbed_id,
957                     _testbed_version = cross_testbed.testbed_version)
958                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
959                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
960                 for attr_name in attribute_list:
961                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
962                     elem_cross_data[attr_name] = attr_value
963         
964         # undefer all values - we'll have to serialize them probably later
965         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
966             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
967                 for attr_name, attr_value in elem_cross_data.iteritems():
968                     elem_cross_data[attr_name] = _undefer(attr_value)
969         
970         return cross_data
971