More documentation of recovery procedures
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
19
20 def _undefer(deferred):
21     if hasattr(deferred, '_get'):
22         return deferred._get()
23     else:
24         return deferred
25
26
27 class TestbedController(object):
28     def __init__(self, testbed_id, testbed_version):
29         self._testbed_id = testbed_id
30         self._testbed_version = testbed_version
31
32     @property
33     def testbed_id(self):
34         return self._testbed_id
35
36     @property
37     def testbed_version(self):
38         return self._testbed_version
39
40     @property
41     def guids(self):
42         raise NotImplementedError
43
44     def defer_configure(self, name, value):
45         """Instructs setting a configuartion attribute for the testbed instance"""
46         raise NotImplementedError
47
48     def defer_create(self, guid, factory_id):
49         """Instructs creation of element """
50         raise NotImplementedError
51
52     def defer_create_set(self, guid, name, value):
53         """Instructs setting an initial attribute on an element"""
54         raise NotImplementedError
55
56     def defer_factory_set(self, guid, name, value):
57         """Instructs setting an attribute on a factory"""
58         raise NotImplementedError
59
60     def defer_connect(self, guid1, connector_type_name1, guid2, 
61             connector_type_name2): 
62         """Instructs creation of a connection between the given connectors"""
63         raise NotImplementedError
64
65     def defer_cross_connect(self, 
66             guid, connector_type_name,
67             cross_guid, cross_testbed_guid,
68             cross_testbed_id, cross_factory_id,
69             cross_connector_type_name):
70         """
71         Instructs creation of a connection between the given connectors 
72         of different testbed instances
73         """
74         raise NotImplementedError
75
76     def defer_add_trace(self, guid, trace_id):
77         """Instructs the addition of a trace"""
78         raise NotImplementedError
79
80     def defer_add_address(self, guid, address, netprefix, broadcast): 
81         """Instructs the addition of an address"""
82         raise NotImplementedError
83
84     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
85         """Instructs the addition of a route"""
86         raise NotImplementedError
87
88     def do_setup(self):
89         """After do_setup the testbed initial configuration is done"""
90         raise NotImplementedError
91
92     def do_create(self):
93         """
94         After do_create all instructed elements are created and 
95         attributes setted
96         """
97         raise NotImplementedError
98
99     def do_connect_init(self):
100         """
101         After do_connect_init all internal connections between testbed elements
102         are initiated
103         """
104         raise NotImplementedError
105
106     def do_connect_compl(self):
107         """
108         After do_connect all internal connections between testbed elements
109         are completed
110         """
111         raise NotImplementedError
112
113     def do_preconfigure(self):
114         """
115         Done just before resolving netrefs, after connection, before cross connections,
116         useful for early stages of configuration, for setting up stuff that might be
117         required for netref resolution.
118         """
119         raise NotImplementedError
120
121     def do_configure(self):
122         """After do_configure elements are configured"""
123         raise NotImplementedError
124
125     def do_prestart(self):
126         """Before do_start elements are prestart-configured"""
127         raise NotImplementedError
128
129     def do_cross_connect_init(self, cross_data):
130         """
131         After do_cross_connect_init initiation of all external connections 
132         between different testbed elements is performed
133         """
134         raise NotImplementedError
135
136     def do_cross_connect_compl(self, cross_data):
137         """
138         After do_cross_connect_compl completion of all external connections 
139         between different testbed elements is performed
140         """
141         raise NotImplementedError
142
143     def start(self):
144         raise NotImplementedError
145
146     def stop(self):
147         raise NotImplementedError
148
149     def recover(self):
150         """
151         On testbed recovery (if recovery is a supported policy), the controller
152         instance will be re-created and the following sequence invoked:
153         
154             do_setup
155             defer_X - programming the testbed with persisted execution values
156                 (not design values). Execution values (ExecImmutable attributes)
157                 should be enough to recreate the testbed's state.
158             *recover*
159             <cross-connection methods>
160             
161         Start will not be called, and after cross connection invocations,
162         the testbed is supposed to be fully functional again.
163         """
164         raise NotImplementedError
165
166     def set(self, guid, name, value, time = TIME_NOW):
167         raise NotImplementedError
168
169     def get(self, guid, name, time = TIME_NOW):
170         raise NotImplementedError
171     
172     def get_route(self, guid, index, attribute):
173         """
174         Params:
175             
176             guid: guid of box to query
177             index: number of routing entry to fetch
178             attribute: one of Destination, NextHop, NetPrefix
179         """
180         raise NotImplementedError
181
182     def get_address(self, guid, index, attribute='Address'):
183         """
184         Params:
185             
186             guid: guid of box to query
187             index: number of inteface to select
188             attribute: one of Address, NetPrefix, Broadcast
189         """
190         raise NotImplementedError
191
192     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
193         raise NotImplementedError
194
195     def get_factory_id(self, guid):
196         raise NotImplementedError
197
198     def action(self, time, guid, action):
199         raise NotImplementedError
200
201     def status(self, guid):
202         raise NotImplementedError
203
204     def trace(self, guid, trace_id, attribute='value'):
205         raise NotImplementedError
206
207     def traces_info(self):
208         """ dictionary of dictionaries:
209             traces_info = dict({
210                 guid = dict({
211                     trace_id = dict({
212                             host = host,
213                             filepath = filepath,
214                             filesize = size in bytes,
215                         })
216                 })
217             })"""
218         raise NotImplementedError
219
220     def shutdown(self):
221         raise NotImplementedError
222
223 class ExperimentController(object):
224     def __init__(self, experiment_xml, root_dir):
225         self._experiment_design_xml = experiment_xml
226         self._experiment_execute_xml = None
227         self._testbeds = dict()
228         self._deployment_config = dict()
229         self._netrefs = collections.defaultdict(set)
230         self._testbed_netrefs = collections.defaultdict(set)
231         self._cross_data = dict()
232         self._root_dir = root_dir
233         self._netreffed_testbeds = set()
234         self._guids_in_testbed_cache = dict()
235         
236         if experiment_xml is None and root_dir is not None:
237             # Recover
238             self.load_experiment_xml()
239             self.load_execute_xml()
240         else:
241             self.persist_experiment_xml()
242
243     @property
244     def experiment_design_xml(self):
245         return self._experiment_design_xml
246
247     @property
248     def experiment_execute_xml(self):
249         return self._experiment_execute_xml
250
251     @property
252     def guids(self):
253         guids = list()
254         for testbed_guid in self._testbeds.keys():
255             _guids = self._guids_in_testbed(testbed_guid)
256             if _guids:
257                 guids.extend(_guids)
258         return guids
259
260     def persist_experiment_xml(self):
261         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
262         f = open(xml_path, "w")
263         f.write(self._experiment_design_xml)
264         f.close()
265
266     def persist_execute_xml(self):
267         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
268         f = open(xml_path, "w")
269         f.write(self._experiment_execute_xml)
270         f.close()
271
272     def load_experiment_xml(self):
273         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
274         f = open(xml_path, "r")
275         self._experiment_design_xml = f.read()
276         f.close()
277
278     def load_execute_xml(self):
279         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
280         f = open(xml_path, "r")
281         self._experiment_execute_xml = f.read()
282         f.close()
283
284     def trace(self, guid, trace_id, attribute='value'):
285         testbed = self._testbed_for_guid(guid)
286         if testbed != None:
287             return testbed.trace(guid, trace_id, attribute)
288         raise RuntimeError("No element exists with guid %d" % guid)    
289
290     def traces_info(self):
291         traces_info = dict()
292         for guid, testbed in self._testbeds.iteritems():
293             tinfo = testbed.traces_info()
294             if tinfo:
295                 traces_info[guid] = testbed.traces_info()
296         return traces_info
297
298     @staticmethod
299     def _parallel(callables):
300         excs = []
301         def wrap(callable):
302             @functools.wraps(callable)
303             def wrapped(*p, **kw):
304                 try:
305                     callable(*p, **kw)
306                 except:
307                     import traceback
308                     traceback.print_exc(file=sys.stderr)
309                     excs.append(sys.exc_info())
310             return wrapped
311         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
312         for thread in threads:
313             thread.start()
314         for thread in threads:
315             thread.join()
316         for exc in excs:
317             eTyp, eVal, eLoc = exc
318             raise eTyp, eVal, eLoc
319
320     def start(self):
321         self._start()
322
323     def _start(self, recover = False):
324         parser = XmlExperimentParser()
325         
326         if recover:
327             xml = self._experiment_execute_xml
328         else:
329             xml = self._experiment_design_xml
330         data = parser.from_xml_to_data(xml)
331
332         # instantiate testbed controllers
333         to_recover, to_restart = self._init_testbed_controllers(data, recover)
334         all_restart = set(to_restart)
335         
336         if not recover:
337             # persist testbed connection data, for potential recovery
338             self._persist_testbed_proxies()
339         else:
340             # recover recoverable controllers
341             for guid in to_recover:
342                 self._testbeds[guid].do_setup()
343                 self._testbeds[guid].recover()
344         
345         def steps_to_configure(self, allowed_guids):
346             # perform setup in parallel for all test beds,
347             # wait for all threads to finish
348             self._parallel([testbed.do_setup 
349                             for guid,testbed in self._testbeds.iteritems()
350                             if guid in allowed_guids])
351        
352             # perform create-connect in parallel, wait
353             # (internal connections only)
354             self._parallel([testbed.do_create
355                             for guid,testbed in self._testbeds.iteritems()
356                             if guid in allowed_guids])
357
358             self._parallel([testbed.do_connect_init
359                             for guid,testbed in self._testbeds.iteritems()
360                             if guid in allowed_guids])
361
362             self._parallel([testbed.do_connect_compl
363                             for guid,testbed in self._testbeds.iteritems()
364                             if guid in allowed_guids])
365
366             self._parallel([testbed.do_preconfigure
367                             for guid,testbed in self._testbeds.iteritems()
368                             if guid in allowed_guids])
369             self._clear_caches()
370
371         steps_to_configure(self, to_restart)
372
373         if self._netreffed_testbeds:
374             # initally resolve netrefs
375             self.do_netrefs(data, fail_if_undefined=False)
376             
377             # rinse and repeat, for netreffed testbeds
378             netreffed_testbeds = set(self._netreffed_testbeds)
379
380             to_recover, to_restart = self._init_testbed_controllers(data, recover)
381             all_restart.update(to_restart)
382             
383             if not recover:
384                 # persist testbed connection data, for potential recovery
385                 self._persist_testbed_proxies()
386             else:
387                 # recover recoverable controllers
388                 for guid in to_recover:
389                     self._testbeds[guid].do_setup()
390                     self._testbeds[guid].recover()
391
392             # configure dependant testbeds
393             steps_to_configure(self, to_restart)
394         
395         all_restart = [ self._testbeds[guid] for guid in all_restart ]
396             
397         # final netref step, fail if anything's left unresolved
398         self.do_netrefs(data, fail_if_undefined=True)
399        
400         # Only now, that netref dependencies have been solve, it is safe to
401         # program cross_connections
402         self._program_testbed_cross_connections(data)
403  
404         # perform do_configure in parallel for al testbeds
405         # (it's internal configuration for each)
406         self._parallel([testbed.do_configure
407                         for testbed in all_restart])
408
409         self._clear_caches()
410
411         #print >>sys.stderr, "DO IT"
412         #import time
413         #time.sleep(60)
414         
415         # cross-connect (cannot be done in parallel)
416         for guid, testbed in self._testbeds.iteritems():
417             cross_data = self._get_cross_data(guid)
418             testbed.do_cross_connect_init(cross_data)
419         for guid, testbed in self._testbeds.iteritems():
420             cross_data = self._get_cross_data(guid)
421             testbed.do_cross_connect_compl(cross_data)
422        
423         self._clear_caches()
424
425         # Last chance to configure (parallel on all testbeds)
426         self._parallel([testbed.do_prestart
427                         for testbed in all_restart])
428
429         self._clear_caches()
430         
431         if not recover:
432             # update execution xml with execution-specific values
433             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
434             self._update_execute_xml()
435             self.persist_execute_xml()
436
437         # start experiment (parallel start on all testbeds)
438         self._parallel([testbed.start
439                         for testbed in all_restart])
440
441         self._clear_caches()
442
443     def _clear_caches(self):
444         # Cleaning cache for safety.
445         self._guids_in_testbed_cache = dict()
446
447     def _persist_testbed_proxies(self):
448         TRANSIENT = (DC.RECOVER,)
449         
450         # persist access configuration for all testbeds, so that
451         # recovery mode can reconnect to them if it becomes necessary
452         conf = ConfigParser.RawConfigParser()
453         for testbed_guid, testbed_config in self._deployment_config.iteritems():
454             testbed_guid = str(testbed_guid)
455             conf.add_section(testbed_guid)
456             for attr in testbed_config.get_attribute_list():
457                 if attr not in TRANSIENT:
458                     conf.set(testbed_guid, attr, 
459                         testbed_config.get_attribute_value(attr))
460         
461         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
462         conf.write(f)
463         f.close()
464     
465     def _load_testbed_proxies(self):
466         TYPEMAP = {
467             Attribute.STRING : 'get',
468             Attribute.BOOL : 'getboolean',
469             Attribute.ENUM : 'get',
470             Attribute.DOUBLE : 'getfloat',
471             Attribute.INTEGER : 'getint',
472         }
473         
474         TRANSIENT = (DC.RECOVER,)
475         
476         # deferred import because proxy needs
477         # our class definitions to define proxies
478         import nepi.util.proxy as proxy
479         
480         conf = ConfigParser.RawConfigParser()
481         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
482         for testbed_guid in conf.sections():
483             testbed_config = proxy.AccessConfiguration()
484             testbed_guid = str(testbed_guid)
485             for attr in testbed_config.get_attribute_list():
486                 if attr not in TRANSIENT:
487                     getter = getattr(conf, TYPEMAP.get(
488                         testbed_config.get_attribute_type(attr),
489                         'get') )
490                     testbed_config.set_attribute_value(
491                         attr, getter(testbed_guid, attr))
492     
493     def _unpersist_testbed_proxies(self):
494         try:
495             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
496         except:
497             # Just print exceptions, this is just cleanup
498             import traceback
499             ######## BUG ##########
500             #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
501             #traceback.print_exc(file=sys.stderr)
502
503     def _update_execute_xml(self):
504         # For all testbeds,
505         #   For all elements in testbed,
506         #       - gather immutable execute-readable attribuets lists
507         #         asynchronously
508         # Generate new design description from design xml
509         # (Wait for attributes lists - implicit syncpoint)
510         # For all testbeds,
511         #   For all elements in testbed,
512         #       - gather all immutable execute-readable attribute
513         #         values, asynchronously
514         # (Wait for attribute values - implicit syncpoint)
515         # For all testbeds,
516         #   For all elements in testbed,
517         #       - inject non-None values into new design
518         # Generate execute xml from new design
519
520         attribute_lists = dict(
521             (testbed_guid, collections.defaultdict(dict))
522             for testbed_guid in self._testbeds
523         )
524         
525         for testbed_guid, testbed in self._testbeds.iteritems():
526             guids = self._guids_in_testbed(testbed_guid)
527             for guid in guids:
528                 attribute_lists[testbed_guid][guid] = \
529                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
530         
531         parser = XmlExperimentParser()
532         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
533
534         attribute_values = dict(
535             (testbed_guid, collections.defaultdict(dict))
536             for testbed_guid in self._testbeds
537         )
538         
539         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
540             testbed = self._testbeds[testbed_guid]
541             for guid, attribute_list in testbed_attribute_lists.iteritems():
542                 attribute_list = _undefer(attribute_list)
543                 attribute_values[testbed_guid][guid] = dict(
544                     (attribute, testbed.get_deferred(guid, attribute))
545                     for attribute in attribute_list
546                 )
547         
548         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
549             for guid, attribute_values in testbed_attribute_values.iteritems():
550                 for attribute, value in attribute_values.iteritems():
551                     value = _undefer(value)
552                     if value is not None:
553                         execute_data.add_attribute_data(guid, attribute, value)
554         
555         self._experiment_execute_xml = parser.to_xml(data=execute_data)
556
557     def stop(self):
558        for testbed in self._testbeds.values():
559            testbed.stop()
560        self._unpersist_testbed_proxies()
561    
562     def recover(self):
563         # reload perviously persisted testbed access configurations
564         self._load_testbed_proxies()
565
566         self._start(recover = True)
567
568     def is_finished(self, guid):
569         testbed = self._testbed_for_guid(guid)
570         if testbed != None:
571             return testbed.status(guid) == AS.STATUS_FINISHED
572         raise RuntimeError("No element exists with guid %d" % guid)    
573
574     def status(self, guid):
575         testbed = self._testbed_for_guid(guid)
576         if testbed != None:
577             return testbed.status(guid)
578         raise RuntimeError("No element exists with guid %d" % guid)    
579
580     def set(self, guid, name, value, time = TIME_NOW):
581         testbed = self._testbed_for_guid(guid)
582         if testbed != None:
583             testbed.set(guid, name, value, time)
584         else:
585             raise RuntimeError("No element exists with guid %d" % guid)    
586
587     def get(self, guid, name, time = TIME_NOW):
588         testbed = self._testbed_for_guid(guid)
589         if testbed != None:
590             return testbed.get(guid, name, time)
591         raise RuntimeError("No element exists with guid %d" % guid)    
592
593     def get_deferred(self, guid, name, time = TIME_NOW):
594         testbed = self._testbed_for_guid(guid)
595         if testbed != None:
596             return testbed.get_deferred(guid, name, time)
597         raise RuntimeError("No element exists with guid %d" % guid)    
598
599     def get_factory_id(self, guid):
600         testbed = self._testbed_for_guid(guid)
601         if testbed != None:
602             return testbed.get_factory_id(guid)
603         raise RuntimeError("No element exists with guid %d" % guid)    
604
605     def get_testbed_id(self, guid):
606         testbed = self._testbed_for_guid(guid)
607         if testbed != None:
608             return testbed.testbed_id
609         raise RuntimeError("No element exists with guid %d" % guid)    
610
611     def get_testbed_version(self, guid):
612         testbed = self._testbed_for_guid(guid)
613         if testbed != None:
614             return testbed.testbed_version
615         raise RuntimeError("No element exists with guid %d" % guid)    
616
617     def shutdown(self):
618         exceptions = list()
619         for testbed in self._testbeds.values():
620             try:
621                 testbed.shutdown()
622             except:
623                 exceptions.append(sys.exc_info())
624         for exc_info in exceptions:
625             raise exc_info[0], exc_info[1], exc_info[2]
626
627     def _testbed_for_guid(self, guid):
628         for testbed_guid in self._testbeds.keys():
629             if guid in self._guids_in_testbed(testbed_guid):
630                 return self._testbeds[testbed_guid]
631         return None
632
633     def _guids_in_testbed(self, testbed_guid):
634         if testbed_guid not in self._testbeds:
635             return set()
636         if testbed_guid not in self._guids_in_testbed_cache:
637             self._guids_in_testbed_cache[testbed_guid] = \
638                 set(self._testbeds[testbed_guid].guids)
639         return self._guids_in_testbed_cache[testbed_guid]
640
641     @staticmethod
642     def _netref_component_split(component):
643         match = COMPONENT_PATTERN.match(component)
644         if match:
645             return match.group("kind"), match.group("index")
646         else:
647             return component, None
648
649     _NETREF_COMPONENT_GETTERS = {
650         'addr':
651             lambda testbed, guid, index, name: 
652                 testbed.get_address(guid, int(index), name),
653         'route' :
654             lambda testbed, guid, index, name: 
655                 testbed.get_route(guid, int(index), name),
656         'trace' :
657             lambda testbed, guid, index, name: 
658                 testbed.trace(guid, index, name),
659         '' : 
660             lambda testbed, guid, index, name: 
661                 testbed.get(guid, name),
662     }
663     
664     def resolve_netref_value(self, value, failval = None):
665         match = ATTRIBUTE_PATTERN_BASE.search(value)
666         if match:
667             label = match.group("label")
668             if label.startswith('GUID-'):
669                 ref_guid = int(label[5:])
670                 if ref_guid:
671                     expr = match.group("expr")
672                     component = (match.group("component") or "")[1:] # skip the dot
673                     attribute = match.group("attribute")
674                     
675                     # split compound components into component kind and index
676                     # eg: 'addr[0]' -> ('addr', '0')
677                     component, component_index = self._netref_component_split(component)
678
679                     # find object and resolve expression
680                     for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
681                         if component not in self._NETREF_COMPONENT_GETTERS:
682                             raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
683                         elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
684                             pass
685                         else:
686                             ref_value = self._NETREF_COMPONENT_GETTERS[component](
687                                 ref_testbed, ref_guid, component_index, attribute)
688                             if ref_value:
689                                 return value.replace(match.group(), ref_value)
690         # couldn't find value
691         return failval
692     
693     def do_netrefs(self, data, fail_if_undefined = False):
694         # element netrefs
695         for (testbed_guid, guid), attrs in self._netrefs.items():
696             testbed = self._testbeds.get(testbed_guid)
697             if testbed is not None:
698                 for name in set(attrs):
699                     value = testbed.get(guid, name)
700                     if isinstance(value, basestring):
701                         ref_value = self.resolve_netref_value(value)
702                         if ref_value is not None:
703                             testbed.set(guid, name, ref_value)
704                             attrs.remove(name)
705                         elif fail_if_undefined:
706                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
707                 if not attrs:
708                     del self._netrefs[(testbed_guid, guid)]
709         
710         # testbed netrefs
711         for testbed_guid, attrs in self._testbed_netrefs.items():
712             tb_data = dict(data.get_attribute_data(testbed_guid))
713             if data:
714                 for name in set(attrs):
715                     value = tb_data.get(name)
716                     if isinstance(value, basestring):
717                         ref_value = self.resolve_netref_value(value)
718                         if ref_value is not None:
719                             data.set_attribute_data(testbed_guid, name, ref_value)
720                             attrs.remove(name)
721                         elif fail_if_undefined:
722                             raise ValueError, "Unresolvable netref in: %r" % (value,)
723                 if not attrs:
724                     del self._testbed_netrefs[testbed_guid]
725         
726
727     def _init_testbed_controllers(self, data, recover = False):
728         blacklist_testbeds = set(self._testbeds)
729         element_guids = list()
730         label_guids = dict()
731         data_guids = data.guids
732         to_recover = set()
733         to_restart = set()
734
735         # gather label associations
736         for guid in data_guids:
737             if not data.is_testbed_data(guid):
738                 (testbed_guid, factory_id) = data.get_box_data(guid)
739                 label = data.get_attribute_data(guid, "label")
740                 if label is not None:
741                     if label in label_guids:
742                         raise RuntimeError, "Label %r is not unique" % (label,)
743                     label_guids[label] = guid
744
745         # create testbed controllers
746         for guid in data_guids:
747             if data.is_testbed_data(guid):
748                 if guid not in self._testbeds:
749                     try:
750                         self._create_testbed_controller(
751                             guid, data, element_guids, recover)
752                         if recover:
753                             # Already programmed
754                             blacklist_testbeds.add(guid)
755                         else:
756                             to_restart.add(guid)
757                     except:
758                         if recover:
759                             policy = data.get_attribute_data(guid, DC.RECOVERY_POLICY)
760                             if policy == DC.POLICY_FAIL:
761                                 raise
762                             elif policy == DC.POLICY_RECOVER:
763                                 self._create_testbed_controller(
764                                     guid, data, element_guids, False)
765                                 to_recover.add(guid)
766                             elif policy == DC.POLICY_RESTART:
767                                 self._create_testbed_controller(
768                                     guid, data, element_guids, False)
769                                 to_restart.add(guid)
770                             else:
771                                 raise
772                         else:
773                             raise
774         
775         # queue programmable elements
776         #  - that have not been programmed already (blacklist_testbeds)
777         #  - including recovered or restarted testbeds
778         #  - but those that have no unresolved netrefs
779         for guid in data_guids:
780             if not data.is_testbed_data(guid):
781                 (testbed_guid, factory_id) = data.get_box_data(guid)
782                 if testbed_guid not in blacklist_testbeds:
783                     element_guids.append(guid)
784
785         # replace references to elements labels for its guid
786         self._resolve_labels(data, data_guids, label_guids)
787     
788         # program testbed controllers
789         if element_guids:
790             self._program_testbed_controllers(element_guids, data)
791         
792         return to_recover, to_restart
793
794     def _resolve_labels(self, data, data_guids, label_guids):
795         netrefs = self._netrefs
796         testbed_netrefs = self._testbed_netrefs
797         for guid in data_guids:
798             for name, value in data.get_attribute_data(guid):
799                 if isinstance(value, basestring):
800                     match = ATTRIBUTE_PATTERN_BASE.search(value)
801                     if match:
802                         label = match.group("label")
803                         if not label.startswith('GUID-'):
804                             ref_guid = label_guids.get(label)
805                             if ref_guid is not None:
806                                 value = ATTRIBUTE_PATTERN_BASE.sub(
807                                     ATTRIBUTE_PATTERN_GUID_SUB % dict(
808                                         guid = 'GUID-%d' % (ref_guid,),
809                                         expr = match.group("expr"),
810                                         label = label), 
811                                     value)
812                                 data.set_attribute_data(guid, name, value)
813                                 
814                                 # memorize which guid-attribute pairs require
815                                 # postprocessing, to avoid excessive controller-testbed
816                                 # communication at configuration time
817                                 # (which could require high-latency network I/O)
818                                 if not data.is_testbed_data(guid):
819                                     (testbed_guid, factory_id) = data.get_box_data(guid)
820                                     netrefs[(testbed_guid, guid)].add(name)
821                                 else:
822                                     testbed_netrefs[guid].add(name)
823
824     def _create_testbed_controller(self, guid, data, element_guids, recover):
825         (testbed_id, testbed_version) = data.get_testbed_data(guid)
826         deployment_config = self._deployment_config.get(guid)
827         
828         # deferred import because proxy needs
829         # our class definitions to define proxies
830         import nepi.util.proxy as proxy
831         
832         if deployment_config is None:
833             # need to create one
834             deployment_config = proxy.AccessConfiguration()
835             
836             for (name, value) in data.get_attribute_data(guid):
837                 if value is not None and deployment_config.has_attribute(name):
838                     # if any deployment config attribute has a netref, we can't
839                     # create this controller yet
840                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
841                         # remember to re-issue this one
842                         self._netreffed_testbeds.add(guid)
843                         return
844                     
845                     # copy deployment config attribute
846                     deployment_config.set_attribute_value(name, value)
847             
848             # commit config
849             self._deployment_config[guid] = deployment_config
850         
851         if deployment_config is not None:
852             # force recovery mode 
853             deployment_config.set_attribute_value("recover",recover)
854         
855         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
856                 deployment_config)
857         for (name, value) in data.get_attribute_data(guid):
858             testbed.defer_configure(name, value)
859         self._testbeds[guid] = testbed
860         if guid in self._netreffed_testbeds:
861             self._netreffed_testbeds.remove(guid)
862
863     def _program_testbed_controllers(self, element_guids, data):
864         def resolve_create_netref(data, guid, name, value): 
865             # Try to resolve create-time netrefs, if possible
866             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
867                 try:
868                     nuvalue = self.resolve_netref_value(value)
869                 except:
870                     # Any trouble means we're not in shape to resolve the netref yet
871                     nuvalue = None
872                 if nuvalue is not None:
873                     # Only if we succeed we remove the netref deferral entry
874                     value = nuvalue
875                     data.set_attribute_data(guid, name, value)
876                     if (testbed_guid, guid) in self._netrefs:
877                         self._netrefs[(testbed_guid, guid)].discard(name)
878             return value
879
880         for guid in element_guids:
881             (testbed_guid, factory_id) = data.get_box_data(guid)
882             testbed = self._testbeds.get(testbed_guid)
883             if testbed is not None:
884                 # create
885                 testbed.defer_create(guid, factory_id)
886                 # set attributes
887                 for (name, value) in data.get_attribute_data(guid):
888                     value = resolve_create_netref(data, guid, name, value)
889                     testbed.defer_create_set(guid, name, value)
890
891         for guid in element_guids:
892             (testbed_guid, factory_id) = data.get_box_data(guid)
893             testbed = self._testbeds.get(testbed_guid)
894             if testbed is not None:
895                 # traces
896                 for trace_id in data.get_trace_data(guid):
897                     testbed.defer_add_trace(guid, trace_id)
898                 # addresses
899                 for (address, netprefix, broadcast) in data.get_address_data(guid):
900                     if address != None:
901                         testbed.defer_add_address(guid, address, netprefix, 
902                                 broadcast)
903                 # routes
904                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
905                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
906                 # store connections data
907                 for (connector_type_name, other_guid, other_connector_type_name) \
908                         in data.get_connection_data(guid):
909                     (other_testbed_guid, other_factory_id) = data.get_box_data(
910                             other_guid)
911                     if testbed_guid == other_testbed_guid:
912                         # each testbed should take care of enforcing internal
913                         # connection simmetry, so each connection is only
914                         # added in one direction
915                         testbed.defer_connect(guid, connector_type_name, 
916                                 other_guid, other_connector_type_name)
917
918     def _program_testbed_cross_connections(self, data):
919         data_guids = data.guids
920         for guid in data_guids: 
921             if not data.is_testbed_data(guid):
922                 (testbed_guid, factory_id) = data.get_box_data(guid)
923                 testbed = self._testbeds.get(testbed_guid)
924                 if testbed is not None:
925                     for (connector_type_name, cross_guid, cross_connector_type_name) \
926                             in data.get_connection_data(guid):
927                         (testbed_guid, factory_id) = data.get_box_data(guid)
928                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
929                                 cross_guid)
930                         if testbed_guid != cross_testbed_guid:
931                             cross_testbed = self._testbeds[cross_testbed_guid]
932                             cross_testbed_id = cross_testbed.testbed_id
933                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
934                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
935                                     cross_connector_type_name)
936                             # save cross data for later
937                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
938                                     cross_guid)
939
940     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
941         if testbed_guid not in self._cross_data:
942             self._cross_data[testbed_guid] = dict()
943         if cross_testbed_guid not in self._cross_data[testbed_guid]:
944             self._cross_data[testbed_guid][cross_testbed_guid] = set()
945         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
946
947     def _get_cross_data(self, testbed_guid):
948         cross_data = dict()
949         if not testbed_guid in self._cross_data:
950             return cross_data
951
952         # fetch attribute lists in one batch
953         attribute_lists = dict()
954         for cross_testbed_guid, guid_list in \
955                 self._cross_data[testbed_guid].iteritems():
956             cross_testbed = self._testbeds[cross_testbed_guid]
957             for cross_guid in guid_list:
958                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
959                     cross_testbed.get_attribute_list_deferred(cross_guid)
960
961         # fetch attribute values in another batch
962         for cross_testbed_guid, guid_list in \
963                 self._cross_data[testbed_guid].iteritems():
964             cross_data[cross_testbed_guid] = dict()
965             cross_testbed = self._testbeds[cross_testbed_guid]
966             for cross_guid in guid_list:
967                 elem_cross_data = dict(
968                     _guid = cross_guid,
969                     _testbed_guid = cross_testbed_guid,
970                     _testbed_id = cross_testbed.testbed_id,
971                     _testbed_version = cross_testbed.testbed_version)
972                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
973                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
974                 for attr_name in attribute_list:
975                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
976                     elem_cross_data[attr_name] = attr_value
977         
978         # undefer all values - we'll have to serialize them probably later
979         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
980             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
981                 for attr_name, attr_value in elem_cross_data.iteritems():
982                     elem_cross_data[attr_name] = _undefer(attr_value)
983         
984         return cross_data
985