Ticket 87: "[NEPI] Experiment started on..."
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15 import time
16
17 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
18 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
19 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
20
21 def _undefer(deferred):
22     if hasattr(deferred, '_get'):
23         return deferred._get()
24     else:
25         return deferred
26
27
28 class TestbedController(object):
29     def __init__(self, testbed_id, testbed_version):
30         self._testbed_id = testbed_id
31         self._testbed_version = testbed_version
32
33     @property
34     def testbed_id(self):
35         return self._testbed_id
36
37     @property
38     def testbed_version(self):
39         return self._testbed_version
40
41     @property
42     def guids(self):
43         raise NotImplementedError
44
45     def defer_configure(self, name, value):
46         """Instructs setting a configuartion attribute for the testbed instance"""
47         raise NotImplementedError
48
49     def defer_create(self, guid, factory_id):
50         """Instructs creation of element """
51         raise NotImplementedError
52
53     def defer_create_set(self, guid, name, value):
54         """Instructs setting an initial attribute on an element"""
55         raise NotImplementedError
56
57     def defer_factory_set(self, guid, name, value):
58         """Instructs setting an attribute on a factory"""
59         raise NotImplementedError
60
61     def defer_connect(self, guid1, connector_type_name1, guid2, 
62             connector_type_name2): 
63         """Instructs creation of a connection between the given connectors"""
64         raise NotImplementedError
65
66     def defer_cross_connect(self, 
67             guid, connector_type_name,
68             cross_guid, cross_testbed_guid,
69             cross_testbed_id, cross_factory_id,
70             cross_connector_type_name):
71         """
72         Instructs creation of a connection between the given connectors 
73         of different testbed instances
74         """
75         raise NotImplementedError
76
77     def defer_add_trace(self, guid, trace_id):
78         """Instructs the addition of a trace"""
79         raise NotImplementedError
80
81     def defer_add_address(self, guid, address, netprefix, broadcast): 
82         """Instructs the addition of an address"""
83         raise NotImplementedError
84
85     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
86         """Instructs the addition of a route"""
87         raise NotImplementedError
88
89     def do_setup(self):
90         """After do_setup the testbed initial configuration is done"""
91         raise NotImplementedError
92
93     def do_create(self):
94         """
95         After do_create all instructed elements are created and 
96         attributes setted
97         """
98         raise NotImplementedError
99
100     def do_connect_init(self):
101         """
102         After do_connect_init all internal connections between testbed elements
103         are initiated
104         """
105         raise NotImplementedError
106
107     def do_connect_compl(self):
108         """
109         After do_connect all internal connections between testbed elements
110         are completed
111         """
112         raise NotImplementedError
113
114     def do_preconfigure(self):
115         """
116         Done just before resolving netrefs, after connection, before cross connections,
117         useful for early stages of configuration, for setting up stuff that might be
118         required for netref resolution.
119         """
120         raise NotImplementedError
121
122     def do_configure(self):
123         """After do_configure elements are configured"""
124         raise NotImplementedError
125
126     def do_prestart(self):
127         """Before do_start elements are prestart-configured"""
128         raise NotImplementedError
129
130     def do_cross_connect_init(self, cross_data):
131         """
132         After do_cross_connect_init initiation of all external connections 
133         between different testbed elements is performed
134         """
135         raise NotImplementedError
136
137     def do_cross_connect_compl(self, cross_data):
138         """
139         After do_cross_connect_compl completion of all external connections 
140         between different testbed elements is performed
141         """
142         raise NotImplementedError
143
144     def start(self):
145         raise NotImplementedError
146
147     def stop(self):
148         raise NotImplementedError
149
150     def recover(self):
151         """
152         On testbed recovery (if recovery is a supported policy), the controller
153         instance will be re-created and the following sequence invoked:
154         
155             do_setup
156             defer_X - programming the testbed with persisted execution values
157                 (not design values). Execution values (ExecImmutable attributes)
158                 should be enough to recreate the testbed's state.
159             *recover*
160             <cross-connection methods>
161             
162         Start will not be called, and after cross connection invocations,
163         the testbed is supposed to be fully functional again.
164         """
165         raise NotImplementedError
166
167     def set(self, guid, name, value, time = TIME_NOW):
168         raise NotImplementedError
169
170     def get(self, guid, name, time = TIME_NOW):
171         raise NotImplementedError
172     
173     def get_route(self, guid, index, attribute):
174         """
175         Params:
176             
177             guid: guid of box to query
178             index: number of routing entry to fetch
179             attribute: one of Destination, NextHop, NetPrefix
180         """
181         raise NotImplementedError
182
183     def get_address(self, guid, index, attribute='Address'):
184         """
185         Params:
186             
187             guid: guid of box to query
188             index: number of inteface to select
189             attribute: one of Address, NetPrefix, Broadcast
190         """
191         raise NotImplementedError
192
193     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
194         raise NotImplementedError
195
196     def get_factory_id(self, guid):
197         raise NotImplementedError
198
199     def action(self, time, guid, action):
200         raise NotImplementedError
201
202     def status(self, guid):
203         raise NotImplementedError
204     
205     def testbed_status(self):
206         raise NotImplementedError
207
208     def trace(self, guid, trace_id, attribute='value'):
209         raise NotImplementedError
210
211     def traces_info(self):
212         """ dictionary of dictionaries:
213             traces_info = dict({
214                 guid = dict({
215                     trace_id = dict({
216                             host = host,
217                             filepath = filepath,
218                             filesize = size in bytes,
219                         })
220                 })
221             })"""
222         raise NotImplementedError
223
224     def shutdown(self):
225         raise NotImplementedError
226
227 class ExperimentController(object):
228     def __init__(self, experiment_xml, root_dir):
229         self._experiment_design_xml = experiment_xml
230         self._experiment_execute_xml = None
231         self._testbeds = dict()
232         self._deployment_config = dict()
233         self._netrefs = collections.defaultdict(set)
234         self._testbed_netrefs = collections.defaultdict(set)
235         self._cross_data = dict()
236         self._root_dir = root_dir
237         self._netreffed_testbeds = set()
238         self._guids_in_testbed_cache = dict()
239         self._failed_testbeds = set()
240         self._started_time = None
241         self._stopped_time = None
242         
243         if experiment_xml is None and root_dir is not None:
244             # Recover
245             self.load_experiment_xml()
246             self.load_execute_xml()
247         else:
248             self.persist_experiment_xml()
249
250     @property
251     def experiment_design_xml(self):
252         return self._experiment_design_xml
253
254     @property
255     def experiment_execute_xml(self):
256         return self._experiment_execute_xml
257
258     @property
259     def started_time(self):
260         return self._started_time
261
262     @property
263     def stopped_time(self):
264         return self._stopped_time
265
266     @property
267     def guids(self):
268         guids = list()
269         for testbed_guid in self._testbeds.keys():
270             _guids = self._guids_in_testbed(testbed_guid)
271             if _guids:
272                 guids.extend(_guids)
273         return guids
274
275     def persist_experiment_xml(self):
276         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
277         f = open(xml_path, "w")
278         f.write(self._experiment_design_xml)
279         f.close()
280
281     def persist_execute_xml(self):
282         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
283         f = open(xml_path, "w")
284         f.write(self._experiment_execute_xml)
285         f.close()
286
287     def load_experiment_xml(self):
288         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
289         f = open(xml_path, "r")
290         self._experiment_design_xml = f.read()
291         f.close()
292
293     def load_execute_xml(self):
294         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
295         f = open(xml_path, "r")
296         self._experiment_execute_xml = f.read()
297         f.close()
298
299     def trace(self, guid, trace_id, attribute='value'):
300         testbed = self._testbed_for_guid(guid)
301         if testbed != None:
302             return testbed.trace(guid, trace_id, attribute)
303         raise RuntimeError("No element exists with guid %d" % guid)    
304
305     def traces_info(self):
306         traces_info = dict()
307         for guid, testbed in self._testbeds.iteritems():
308             tinfo = testbed.traces_info()
309             if tinfo:
310                 traces_info[guid] = testbed.traces_info()
311         return traces_info
312
313     @staticmethod
314     def _parallel(callables):
315         excs = []
316         def wrap(callable):
317             @functools.wraps(callable)
318             def wrapped(*p, **kw):
319                 try:
320                     callable(*p, **kw)
321                 except:
322                     import traceback
323                     traceback.print_exc(file=sys.stderr)
324                     excs.append(sys.exc_info())
325             return wrapped
326         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
327         for thread in threads:
328             thread.start()
329         for thread in threads:
330             thread.join()
331         for exc in excs:
332             eTyp, eVal, eLoc = exc
333             raise eTyp, eVal, eLoc
334
335     def start(self):
336         self._started_time = time.time() 
337         self._start()
338
339     def _start(self, recover = False):
340         parser = XmlExperimentParser()
341         
342         if recover:
343             xml = self._experiment_execute_xml
344         else:
345             xml = self._experiment_design_xml
346         data = parser.from_xml_to_data(xml)
347
348         # instantiate testbed controllers
349         to_recover, to_restart = self._init_testbed_controllers(data, recover)
350         all_restart = set(to_restart)
351         
352         if not recover:
353             # persist testbed connection data, for potential recovery
354             self._persist_testbed_proxies()
355         else:
356             # recover recoverable controllers
357             for guid in to_recover:
358                 try:
359                     self._testbeds[guid].do_setup()
360                     self._testbeds[guid].recover()
361                 except:
362                     # Mark failed
363                     self._failed_testbeds.add(guid)
364     
365         def steps_to_configure(self, allowed_guids):
366             # perform setup in parallel for all test beds,
367             # wait for all threads to finish
368             self._parallel([testbed.do_setup 
369                             for guid,testbed in self._testbeds.iteritems()
370                             if guid in allowed_guids])
371        
372             # perform create-connect in parallel, wait
373             # (internal connections only)
374             self._parallel([testbed.do_create
375                             for guid,testbed in self._testbeds.iteritems()
376                             if guid in allowed_guids])
377
378             self._parallel([testbed.do_connect_init
379                             for guid,testbed in self._testbeds.iteritems()
380                             if guid in allowed_guids])
381
382             self._parallel([testbed.do_connect_compl
383                             for guid,testbed in self._testbeds.iteritems()
384                             if guid in allowed_guids])
385
386             self._parallel([testbed.do_preconfigure
387                             for guid,testbed in self._testbeds.iteritems()
388                             if guid in allowed_guids])
389             self._clear_caches()
390
391         steps_to_configure(self, to_restart)
392
393         if self._netreffed_testbeds:
394             # initally resolve netrefs
395             self.do_netrefs(data, fail_if_undefined=False)
396             
397             # rinse and repeat, for netreffed testbeds
398             netreffed_testbeds = set(self._netreffed_testbeds)
399
400             to_recover, to_restart = self._init_testbed_controllers(data, recover)
401             all_restart.update(to_restart)
402             
403             if not recover:
404                 # persist testbed connection data, for potential recovery
405                 self._persist_testbed_proxies()
406             else:
407                 # recover recoverable controllers
408                 for guid in to_recover:
409                     try:
410                         self._testbeds[guid].do_setup()
411                         self._testbeds[guid].recover()
412                     except:
413                         # Mark failed
414                         self._failed_testbeds.add(guid)
415
416             # configure dependant testbeds
417             steps_to_configure(self, to_restart)
418         
419         all_restart = [ self._testbeds[guid] for guid in all_restart ]
420             
421         # final netref step, fail if anything's left unresolved
422         self.do_netrefs(data, fail_if_undefined=True)
423        
424         # Only now, that netref dependencies have been solve, it is safe to
425         # program cross_connections
426         self._program_testbed_cross_connections(data)
427  
428         # perform do_configure in parallel for al testbeds
429         # (it's internal configuration for each)
430         self._parallel([testbed.do_configure
431                         for testbed in all_restart])
432
433         self._clear_caches()
434
435         #print >>sys.stderr, "DO IT"
436         #import time
437         #time.sleep(60)
438         
439         # cross-connect (cannot be done in parallel)
440         for guid, testbed in self._testbeds.iteritems():
441             cross_data = self._get_cross_data(guid)
442             testbed.do_cross_connect_init(cross_data)
443         for guid, testbed in self._testbeds.iteritems():
444             cross_data = self._get_cross_data(guid)
445             testbed.do_cross_connect_compl(cross_data)
446        
447         self._clear_caches()
448
449         # Last chance to configure (parallel on all testbeds)
450         self._parallel([testbed.do_prestart
451                         for testbed in all_restart])
452
453         self._clear_caches()
454         
455         if not recover:
456             # update execution xml with execution-specific values
457             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
458             self._update_execute_xml()
459             self.persist_execute_xml()
460
461         # start experiment (parallel start on all testbeds)
462         self._parallel([testbed.start
463                         for testbed in all_restart])
464
465         self._clear_caches()
466
467     def _clear_caches(self):
468         # Cleaning cache for safety.
469         self._guids_in_testbed_cache = dict()
470
471     def _persist_testbed_proxies(self):
472         TRANSIENT = (DC.RECOVER,)
473         
474         # persist access configuration for all testbeds, so that
475         # recovery mode can reconnect to them if it becomes necessary
476         conf = ConfigParser.RawConfigParser()
477         for testbed_guid, testbed_config in self._deployment_config.iteritems():
478             testbed_guid = str(testbed_guid)
479             conf.add_section(testbed_guid)
480             for attr in testbed_config.get_attribute_list():
481                 if attr not in TRANSIENT:
482                     conf.set(testbed_guid, attr, 
483                         testbed_config.get_attribute_value(attr))
484         
485         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
486         conf.write(f)
487         f.close()
488     
489     def _load_testbed_proxies(self):
490         TYPEMAP = {
491             Attribute.STRING : 'get',
492             Attribute.BOOL : 'getboolean',
493             Attribute.ENUM : 'get',
494             Attribute.DOUBLE : 'getfloat',
495             Attribute.INTEGER : 'getint',
496         }
497         
498         TRANSIENT = (DC.RECOVER,)
499         
500         # deferred import because proxy needs
501         # our class definitions to define proxies
502         import nepi.util.proxy as proxy
503         
504         conf = ConfigParser.RawConfigParser()
505         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
506         for testbed_guid in conf.sections():
507             testbed_config = proxy.AccessConfiguration()
508             testbed_guid = str(testbed_guid)
509             for attr in testbed_config.get_attribute_list():
510                 if attr not in TRANSIENT:
511                     getter = getattr(conf, TYPEMAP.get(
512                         testbed_config.get_attribute_type(attr),
513                         'get') )
514                     testbed_config.set_attribute_value(
515                         attr, getter(testbed_guid, attr))
516     
517     def _unpersist_testbed_proxies(self):
518         try:
519             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
520         except:
521             # Just print exceptions, this is just cleanup
522             import traceback
523             ######## BUG ##########
524             #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
525             #traceback.print_exc(file=sys.stderr)
526
527     def _update_execute_xml(self):
528         # For all testbeds,
529         #   For all elements in testbed,
530         #       - gather immutable execute-readable attribuets lists
531         #         asynchronously
532         # Generate new design description from design xml
533         # (Wait for attributes lists - implicit syncpoint)
534         # For all testbeds,
535         #   For all elements in testbed,
536         #       - gather all immutable execute-readable attribute
537         #         values, asynchronously
538         # (Wait for attribute values - implicit syncpoint)
539         # For all testbeds,
540         #   For all elements in testbed,
541         #       - inject non-None values into new design
542         # Generate execute xml from new design
543
544         attribute_lists = dict(
545             (testbed_guid, collections.defaultdict(dict))
546             for testbed_guid in self._testbeds
547         )
548         
549         for testbed_guid, testbed in self._testbeds.iteritems():
550             guids = self._guids_in_testbed(testbed_guid)
551             for guid in guids:
552                 attribute_lists[testbed_guid][guid] = \
553                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
554         
555         parser = XmlExperimentParser()
556         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
557
558         attribute_values = dict(
559             (testbed_guid, collections.defaultdict(dict))
560             for testbed_guid in self._testbeds
561         )
562         
563         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
564             testbed = self._testbeds[testbed_guid]
565             for guid, attribute_list in testbed_attribute_lists.iteritems():
566                 attribute_list = _undefer(attribute_list)
567                 attribute_values[testbed_guid][guid] = dict(
568                     (attribute, testbed.get_deferred(guid, attribute))
569                     for attribute in attribute_list
570                 )
571         
572         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
573             for guid, attribute_values in testbed_attribute_values.iteritems():
574                 for attribute, value in attribute_values.iteritems():
575                     value = _undefer(value)
576                     if value is not None:
577                         execute_data.add_attribute_data(guid, attribute, value)
578         
579         self._experiment_execute_xml = parser.to_xml(data=execute_data)
580
581     def stop(self):
582        for testbed in self._testbeds.values():
583            testbed.stop()
584        self._unpersist_testbed_proxies()
585        self._stopped_time = time.time() 
586    
587     def recover(self):
588         # reload perviously persisted testbed access configurations
589         self._failed_testbeds.clear()
590         self._load_testbed_proxies()
591
592         # re-program testbeds that need recovery
593         self._start(recover = True)
594
595     def is_finished(self, guid):
596         testbed = self._testbed_for_guid(guid)
597         if testbed != None:
598             return testbed.status(guid) == AS.STATUS_FINISHED
599         raise RuntimeError("No element exists with guid %d" % guid)    
600     
601     def _testbed_recovery_policy(self, guid, data = None):
602         if data is None:
603             parser = XmlExperimentParser()
604             data = parser.from_xml_to_data(self._experiment_design_xml)
605         
606         return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
607
608     def status(self, guid):
609         if guid in self._testbeds:
610             # guid is a testbed
611             # report testbed status
612             if guid in self._failed_testbeds:
613                 return TS.STATUS_FAILED
614             else:
615                 try:
616                     return self._testbeds[guid].status()
617                 except:
618                     return TS.STATUS_UNRESPONSIVE
619         else:
620             # guid is an element
621             testbed = self._testbed_for_guid(guid)
622             if testbed is not None:
623                 return testbed.status(guid)
624             else:
625                 return AS.STATUS_UNDETERMINED
626
627     def set(self, guid, name, value, time = TIME_NOW):
628         testbed = self._testbed_for_guid(guid)
629         if testbed != None:
630             testbed.set(guid, name, value, time)
631         else:
632             raise RuntimeError("No element exists with guid %d" % guid)    
633
634     def get(self, guid, name, time = TIME_NOW):
635         testbed = self._testbed_for_guid(guid)
636         if testbed != None:
637             return testbed.get(guid, name, time)
638         raise RuntimeError("No element exists with guid %d" % guid)    
639
640     def get_deferred(self, guid, name, time = TIME_NOW):
641         testbed = self._testbed_for_guid(guid)
642         if testbed != None:
643             return testbed.get_deferred(guid, name, time)
644         raise RuntimeError("No element exists with guid %d" % guid)    
645
646     def get_factory_id(self, guid):
647         testbed = self._testbed_for_guid(guid)
648         if testbed != None:
649             return testbed.get_factory_id(guid)
650         raise RuntimeError("No element exists with guid %d" % guid)    
651
652     def get_testbed_id(self, guid):
653         testbed = self._testbed_for_guid(guid)
654         if testbed != None:
655             return testbed.testbed_id
656         raise RuntimeError("No element exists with guid %d" % guid)    
657
658     def get_testbed_version(self, guid):
659         testbed = self._testbed_for_guid(guid)
660         if testbed != None:
661             return testbed.testbed_version
662         raise RuntimeError("No element exists with guid %d" % guid)    
663
664     def shutdown(self):
665         exceptions = list()
666         for testbed in self._testbeds.values():
667             try:
668                 testbed.shutdown()
669             except:
670                 exceptions.append(sys.exc_info())
671         for exc_info in exceptions:
672             raise exc_info[0], exc_info[1], exc_info[2]
673
674     def _testbed_for_guid(self, guid):
675         for testbed_guid in self._testbeds.keys():
676             if guid in self._guids_in_testbed(testbed_guid):
677                 if testbed_guid in self._failed_testbeds:
678                     return None
679                 return self._testbeds[testbed_guid]
680         return None
681
682     def _guids_in_testbed(self, testbed_guid):
683         if testbed_guid not in self._testbeds:
684             return set()
685         if testbed_guid not in self._guids_in_testbed_cache:
686             self._guids_in_testbed_cache[testbed_guid] = \
687                 set(self._testbeds[testbed_guid].guids)
688         return self._guids_in_testbed_cache[testbed_guid]
689
690     @staticmethod
691     def _netref_component_split(component):
692         match = COMPONENT_PATTERN.match(component)
693         if match:
694             return match.group("kind"), match.group("index")
695         else:
696             return component, None
697
698     _NETREF_COMPONENT_GETTERS = {
699         'addr':
700             lambda testbed, guid, index, name: 
701                 testbed.get_address(guid, int(index), name),
702         'route' :
703             lambda testbed, guid, index, name: 
704                 testbed.get_route(guid, int(index), name),
705         'trace' :
706             lambda testbed, guid, index, name: 
707                 testbed.trace(guid, index, name),
708         '' : 
709             lambda testbed, guid, index, name: 
710                 testbed.get(guid, name),
711     }
712     
713     def resolve_netref_value(self, value, failval = None):
714         rv = failval
715         while True:
716             for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
717                 label = match.group("label")
718                 if label.startswith('GUID-'):
719                     ref_guid = int(label[5:])
720                     if ref_guid:
721                         expr = match.group("expr")
722                         component = (match.group("component") or "")[1:] # skip the dot
723                         attribute = match.group("attribute")
724                         
725                         # split compound components into component kind and index
726                         # eg: 'addr[0]' -> ('addr', '0')
727                         component, component_index = self._netref_component_split(component)
728
729                         # find object and resolve expression
730                         for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
731                             if component not in self._NETREF_COMPONENT_GETTERS:
732                                 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
733                             elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
734                                 pass
735                             else:
736                                 ref_value = self._NETREF_COMPONENT_GETTERS[component](
737                                     ref_testbed, ref_guid, component_index, attribute)
738                                 if ref_value:
739                                     value = rv = value.replace(match.group(), ref_value)
740                                     break
741                         else:
742                             # unresolvable netref
743                             return failval
744                         break
745             else:
746                 break
747         return rv
748     
749     def do_netrefs(self, data, fail_if_undefined = False):
750         # element netrefs
751         for (testbed_guid, guid), attrs in self._netrefs.items():
752             testbed = self._testbeds.get(testbed_guid)
753             if testbed is not None:
754                 for name in set(attrs):
755                     value = testbed.get(guid, name)
756                     if isinstance(value, basestring):
757                         ref_value = self.resolve_netref_value(value)
758                         if ref_value is not None:
759                             testbed.set(guid, name, ref_value)
760                             attrs.remove(name)
761                         elif fail_if_undefined:
762                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
763                 if not attrs:
764                     del self._netrefs[(testbed_guid, guid)]
765         
766         # testbed netrefs
767         for testbed_guid, attrs in self._testbed_netrefs.items():
768             tb_data = dict(data.get_attribute_data(testbed_guid))
769             if data:
770                 for name in set(attrs):
771                     value = tb_data.get(name)
772                     if isinstance(value, basestring):
773                         ref_value = self.resolve_netref_value(value)
774                         if ref_value is not None:
775                             data.set_attribute_data(testbed_guid, name, ref_value)
776                             attrs.remove(name)
777                         elif fail_if_undefined:
778                             raise ValueError, "Unresolvable netref in: %r" % (value,)
779                 if not attrs:
780                     del self._testbed_netrefs[testbed_guid]
781         
782
783     def _init_testbed_controllers(self, data, recover = False):
784         blacklist_testbeds = set(self._testbeds)
785         element_guids = list()
786         label_guids = dict()
787         data_guids = data.guids
788         to_recover = set()
789         to_restart = set()
790
791         # gather label associations
792         for guid in data_guids:
793             if not data.is_testbed_data(guid):
794                 (testbed_guid, factory_id) = data.get_box_data(guid)
795                 label = data.get_attribute_data(guid, "label")
796                 if label is not None:
797                     if label in label_guids:
798                         raise RuntimeError, "Label %r is not unique" % (label,)
799                     label_guids[label] = guid
800
801         # create testbed controllers
802         for guid in data_guids:
803             if data.is_testbed_data(guid):
804                 if guid not in self._testbeds:
805                     try:
806                         self._create_testbed_controller(
807                             guid, data, element_guids, recover)
808                         if recover:
809                             # Already programmed
810                             blacklist_testbeds.add(guid)
811                         else:
812                             to_restart.add(guid)
813                     except:
814                         if recover:
815                             policy = self._testbed_recovery_policy(guid, data=data)
816                             if policy == DC.POLICY_RECOVER:
817                                 self._create_testbed_controller(
818                                     guid, data, element_guids, False)
819                                 to_recover.add(guid)
820                             elif policy == DC.POLICY_RESTART:
821                                 self._create_testbed_controller(
822                                     guid, data, element_guids, False)
823                                 to_restart.add(guid)
824                             else:
825                                 # Mark failed
826                                 self._failed_testbeds.add(guid)
827                         else:
828                             raise
829         
830         # queue programmable elements
831         #  - that have not been programmed already (blacklist_testbeds)
832         #  - including recovered or restarted testbeds
833         #  - but those that have no unresolved netrefs
834         for guid in data_guids:
835             if not data.is_testbed_data(guid):
836                 (testbed_guid, factory_id) = data.get_box_data(guid)
837                 if testbed_guid not in blacklist_testbeds:
838                     element_guids.append(guid)
839
840         # replace references to elements labels for its guid
841         self._resolve_labels(data, data_guids, label_guids)
842     
843         # program testbed controllers
844         if element_guids:
845             self._program_testbed_controllers(element_guids, data)
846         
847         return to_recover, to_restart
848
849     def _resolve_labels(self, data, data_guids, label_guids):
850         netrefs = self._netrefs
851         testbed_netrefs = self._testbed_netrefs
852         for guid in data_guids:
853             for name, value in data.get_attribute_data(guid):
854                 if isinstance(value, basestring):
855                     while True:
856                         for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
857                             label = match.group("label")
858                             if not label.startswith('GUID-'):
859                                 ref_guid = label_guids.get(label)
860                                 if ref_guid is not None:
861                                     value = ATTRIBUTE_PATTERN_BASE.sub(
862                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
863                                             guid = 'GUID-%d' % (ref_guid,),
864                                             expr = match.group("expr"),
865                                             label = label), 
866                                         value)
867                                     data.set_attribute_data(guid, name, value)
868                                     
869                                     # memorize which guid-attribute pairs require
870                                     # postprocessing, to avoid excessive controller-testbed
871                                     # communication at configuration time
872                                     # (which could require high-latency network I/O)
873                                     if not data.is_testbed_data(guid):
874                                         (testbed_guid, factory_id) = data.get_box_data(guid)
875                                         netrefs[(testbed_guid, guid)].add(name)
876                                     else:
877                                         testbed_netrefs[guid].add(name)
878                                     
879                                     break
880                         else:
881                             break
882
883     def _create_testbed_controller(self, guid, data, element_guids, recover):
884         (testbed_id, testbed_version) = data.get_testbed_data(guid)
885         deployment_config = self._deployment_config.get(guid)
886         
887         # deferred import because proxy needs
888         # our class definitions to define proxies
889         import nepi.util.proxy as proxy
890         
891         if deployment_config is None:
892             # need to create one
893             deployment_config = proxy.AccessConfiguration()
894             
895             for (name, value) in data.get_attribute_data(guid):
896                 if value is not None and deployment_config.has_attribute(name):
897                     # if any deployment config attribute has a netref, we can't
898                     # create this controller yet
899                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
900                         # remember to re-issue this one
901                         self._netreffed_testbeds.add(guid)
902                         return
903                     
904                     # copy deployment config attribute
905                     deployment_config.set_attribute_value(name, value)
906             
907             # commit config
908             self._deployment_config[guid] = deployment_config
909         
910         if deployment_config is not None:
911             # force recovery mode 
912             deployment_config.set_attribute_value("recover",recover)
913         
914         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
915                 deployment_config)
916         for (name, value) in data.get_attribute_data(guid):
917             testbed.defer_configure(name, value)
918         self._testbeds[guid] = testbed
919         if guid in self._netreffed_testbeds:
920             self._netreffed_testbeds.remove(guid)
921
922     def _program_testbed_controllers(self, element_guids, data):
923         def resolve_create_netref(data, guid, name, value): 
924             # Try to resolve create-time netrefs, if possible
925             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
926                 try:
927                     nuvalue = self.resolve_netref_value(value)
928                 except:
929                     # Any trouble means we're not in shape to resolve the netref yet
930                     nuvalue = None
931                 if nuvalue is not None:
932                     # Only if we succeed we remove the netref deferral entry
933                     value = nuvalue
934                     data.set_attribute_data(guid, name, value)
935                     if (testbed_guid, guid) in self._netrefs:
936                         self._netrefs[(testbed_guid, guid)].discard(name)
937             return value
938
939         for guid in element_guids:
940             (testbed_guid, factory_id) = data.get_box_data(guid)
941             testbed = self._testbeds.get(testbed_guid)
942             if testbed is not None:
943                 # create
944                 testbed.defer_create(guid, factory_id)
945                 # set attributes
946                 for (name, value) in data.get_attribute_data(guid):
947                     value = resolve_create_netref(data, guid, name, value)
948                     testbed.defer_create_set(guid, name, value)
949
950         for guid in element_guids:
951             (testbed_guid, factory_id) = data.get_box_data(guid)
952             testbed = self._testbeds.get(testbed_guid)
953             if testbed is not None:
954                 # traces
955                 for trace_id in data.get_trace_data(guid):
956                     testbed.defer_add_trace(guid, trace_id)
957                 # addresses
958                 for (address, netprefix, broadcast) in data.get_address_data(guid):
959                     if address != None:
960                         testbed.defer_add_address(guid, address, netprefix, 
961                                 broadcast)
962                 # routes
963                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
964                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
965                 # store connections data
966                 for (connector_type_name, other_guid, other_connector_type_name) \
967                         in data.get_connection_data(guid):
968                     (other_testbed_guid, other_factory_id) = data.get_box_data(
969                             other_guid)
970                     if testbed_guid == other_testbed_guid:
971                         # each testbed should take care of enforcing internal
972                         # connection simmetry, so each connection is only
973                         # added in one direction
974                         testbed.defer_connect(guid, connector_type_name, 
975                                 other_guid, other_connector_type_name)
976
977     def _program_testbed_cross_connections(self, data):
978         data_guids = data.guids
979         for guid in data_guids: 
980             if not data.is_testbed_data(guid):
981                 (testbed_guid, factory_id) = data.get_box_data(guid)
982                 testbed = self._testbeds.get(testbed_guid)
983                 if testbed is not None:
984                     for (connector_type_name, cross_guid, cross_connector_type_name) \
985                             in data.get_connection_data(guid):
986                         (testbed_guid, factory_id) = data.get_box_data(guid)
987                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
988                                 cross_guid)
989                         if testbed_guid != cross_testbed_guid:
990                             cross_testbed = self._testbeds[cross_testbed_guid]
991                             cross_testbed_id = cross_testbed.testbed_id
992                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
993                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
994                                     cross_connector_type_name)
995                             # save cross data for later
996                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
997                                     cross_guid)
998
999     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1000         if testbed_guid not in self._cross_data:
1001             self._cross_data[testbed_guid] = dict()
1002         if cross_testbed_guid not in self._cross_data[testbed_guid]:
1003             self._cross_data[testbed_guid][cross_testbed_guid] = set()
1004         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1005
1006     def _get_cross_data(self, testbed_guid):
1007         cross_data = dict()
1008         if not testbed_guid in self._cross_data:
1009             return cross_data
1010
1011         # fetch attribute lists in one batch
1012         attribute_lists = dict()
1013         for cross_testbed_guid, guid_list in \
1014                 self._cross_data[testbed_guid].iteritems():
1015             cross_testbed = self._testbeds[cross_testbed_guid]
1016             for cross_guid in guid_list:
1017                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1018                     cross_testbed.get_attribute_list_deferred(cross_guid)
1019
1020         # fetch attribute values in another batch
1021         for cross_testbed_guid, guid_list in \
1022                 self._cross_data[testbed_guid].iteritems():
1023             cross_data[cross_testbed_guid] = dict()
1024             cross_testbed = self._testbeds[cross_testbed_guid]
1025             for cross_guid in guid_list:
1026                 elem_cross_data = dict(
1027                     _guid = cross_guid,
1028                     _testbed_guid = cross_testbed_guid,
1029                     _testbed_id = cross_testbed.testbed_id,
1030                     _testbed_version = cross_testbed.testbed_version)
1031                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1032                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1033                 for attr_name in attribute_list:
1034                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1035                     elem_cross_data[attr_name] = attr_value
1036         
1037         # undefer all values - we'll have to serialize them probably later
1038         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1039             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1040                 for attr_name, attr_value in elem_cross_data.iteritems():
1041                     elem_cross_data[attr_name] = _undefer(attr_value)
1042         
1043         return cross_data
1044