bug fix in GUID/label replacement.
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15 import time
16
17 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
18 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
19 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
20
21 def _undefer(deferred):
22     if hasattr(deferred, '_get'):
23         return deferred._get()
24     else:
25         return deferred
26
27
28 class TestbedController(object):
29     def __init__(self, testbed_id, testbed_version):
30         self._testbed_id = testbed_id
31         self._testbed_version = testbed_version
32
33     @property
34     def testbed_id(self):
35         return self._testbed_id
36
37     @property
38     def testbed_version(self):
39         return self._testbed_version
40
41     @property
42     def guids(self):
43         raise NotImplementedError
44
45     def defer_configure(self, name, value):
46         """Instructs setting a configuartion attribute for the testbed instance"""
47         raise NotImplementedError
48
49     def defer_create(self, guid, factory_id):
50         """Instructs creation of element """
51         raise NotImplementedError
52
53     def defer_create_set(self, guid, name, value):
54         """Instructs setting an initial attribute on an element"""
55         raise NotImplementedError
56
57     def defer_factory_set(self, guid, name, value):
58         """Instructs setting an attribute on a factory"""
59         raise NotImplementedError
60
61     def defer_connect(self, guid1, connector_type_name1, guid2, 
62             connector_type_name2): 
63         """Instructs creation of a connection between the given connectors"""
64         raise NotImplementedError
65
66     def defer_cross_connect(self, 
67             guid, connector_type_name,
68             cross_guid, cross_testbed_guid,
69             cross_testbed_id, cross_factory_id,
70             cross_connector_type_name):
71         """
72         Instructs creation of a connection between the given connectors 
73         of different testbed instances
74         """
75         raise NotImplementedError
76
77     def defer_add_trace(self, guid, trace_id):
78         """Instructs the addition of a trace"""
79         raise NotImplementedError
80
81     def defer_add_address(self, guid, address, netprefix, broadcast): 
82         """Instructs the addition of an address"""
83         raise NotImplementedError
84
85     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
86         """Instructs the addition of a route"""
87         raise NotImplementedError
88
89     def do_setup(self):
90         """After do_setup the testbed initial configuration is done"""
91         raise NotImplementedError
92
93     def do_create(self):
94         """
95         After do_create all instructed elements are created and 
96         attributes setted
97         """
98         raise NotImplementedError
99
100     def do_connect_init(self):
101         """
102         After do_connect_init all internal connections between testbed elements
103         are initiated
104         """
105         raise NotImplementedError
106
107     def do_connect_compl(self):
108         """
109         After do_connect all internal connections between testbed elements
110         are completed
111         """
112         raise NotImplementedError
113
114     def do_preconfigure(self):
115         """
116         Done just before resolving netrefs, after connection, before cross connections,
117         useful for early stages of configuration, for setting up stuff that might be
118         required for netref resolution.
119         """
120         raise NotImplementedError
121
122     def do_configure(self):
123         """After do_configure elements are configured"""
124         raise NotImplementedError
125
126     def do_prestart(self):
127         """Before do_start elements are prestart-configured"""
128         raise NotImplementedError
129
130     def do_cross_connect_init(self, cross_data):
131         """
132         After do_cross_connect_init initiation of all external connections 
133         between different testbed elements is performed
134         """
135         raise NotImplementedError
136
137     def do_cross_connect_compl(self, cross_data):
138         """
139         After do_cross_connect_compl completion of all external connections 
140         between different testbed elements is performed
141         """
142         raise NotImplementedError
143
144     def start(self):
145         raise NotImplementedError
146
147     def stop(self):
148         raise NotImplementedError
149
150     def recover(self):
151         """
152         On testbed recovery (if recovery is a supported policy), the controller
153         instance will be re-created and the following sequence invoked:
154         
155             do_setup
156             defer_X - programming the testbed with persisted execution values
157                 (not design values). Execution values (ExecImmutable attributes)
158                 should be enough to recreate the testbed's state.
159             *recover*
160             <cross-connection methods>
161             
162         Start will not be called, and after cross connection invocations,
163         the testbed is supposed to be fully functional again.
164         """
165         raise NotImplementedError
166
167     def set(self, guid, name, value, time = TIME_NOW):
168         raise NotImplementedError
169
170     def get(self, guid, name, time = TIME_NOW):
171         raise NotImplementedError
172     
173     def get_route(self, guid, index, attribute):
174         """
175         Params:
176             
177             guid: guid of box to query
178             index: number of routing entry to fetch
179             attribute: one of Destination, NextHop, NetPrefix
180         """
181         raise NotImplementedError
182
183     def get_address(self, guid, index, attribute='Address'):
184         """
185         Params:
186             
187             guid: guid of box to query
188             index: number of inteface to select
189             attribute: one of Address, NetPrefix, Broadcast
190         """
191         raise NotImplementedError
192
193     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
194         raise NotImplementedError
195
196     def get_factory_id(self, guid):
197         raise NotImplementedError
198
199     def action(self, time, guid, action):
200         raise NotImplementedError
201
202     def status(self, guid):
203         raise NotImplementedError
204     
205     def testbed_status(self):
206         raise NotImplementedError
207
208     def trace(self, guid, trace_id, attribute='value'):
209         raise NotImplementedError
210
211     def traces_info(self):
212         """ dictionary of dictionaries:
213             traces_info = dict({
214                 guid = dict({
215                     trace_id = dict({
216                             host = host,
217                             filepath = filepath,
218                             filesize = size in bytes,
219                         })
220                 })
221             })"""
222         raise NotImplementedError
223
224     def shutdown(self):
225         raise NotImplementedError
226
227 class ExperimentController(object):
228     def __init__(self, experiment_xml, root_dir):
229         self._experiment_design_xml = experiment_xml
230         self._experiment_execute_xml = None
231         self._testbeds = dict()
232         self._deployment_config = dict()
233         self._netrefs = collections.defaultdict(set)
234         self._testbed_netrefs = collections.defaultdict(set)
235         self._cross_data = dict()
236         self._root_dir = root_dir
237         self._netreffed_testbeds = set()
238         self._guids_in_testbed_cache = dict()
239         self._failed_testbeds = set()
240         self._started_time = None
241         self._stopped_time = None
242         
243         if experiment_xml is None and root_dir is not None:
244             # Recover
245             self.load_experiment_xml()
246             self.load_execute_xml()
247         else:
248             self.persist_experiment_xml()
249
250     @property
251     def experiment_design_xml(self):
252         return self._experiment_design_xml
253
254     @property
255     def experiment_execute_xml(self):
256         return self._experiment_execute_xml
257
258     @property
259     def started_time(self):
260         return self._started_time
261
262     @property
263     def stopped_time(self):
264         return self._stopped_time
265
266     @property
267     def guids(self):
268         guids = list()
269         for testbed_guid in self._testbeds.keys():
270             _guids = self._guids_in_testbed(testbed_guid)
271             if _guids:
272                 guids.extend(_guids)
273         return guids
274
275     def persist_experiment_xml(self):
276         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
277         f = open(xml_path, "w")
278         f.write(self._experiment_design_xml)
279         f.close()
280
281     def persist_execute_xml(self):
282         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
283         f = open(xml_path, "w")
284         f.write(self._experiment_execute_xml)
285         f.close()
286
287     def load_experiment_xml(self):
288         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
289         f = open(xml_path, "r")
290         self._experiment_design_xml = f.read()
291         f.close()
292
293     def load_execute_xml(self):
294         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
295         f = open(xml_path, "r")
296         self._experiment_execute_xml = f.read()
297         f.close()
298
299     def trace(self, guid, trace_id, attribute='value'):
300         testbed = self._testbed_for_guid(guid)
301         if testbed != None:
302             return testbed.trace(guid, trace_id, attribute)
303         raise RuntimeError("No element exists with guid %d" % guid)    
304
305     def traces_info(self):
306         traces_info = dict()
307         for guid, testbed in self._testbeds.iteritems():
308             tinfo = testbed.traces_info()
309             if tinfo:
310                 traces_info[guid] = testbed.traces_info()
311         return traces_info
312
313     @staticmethod
314     def _parallel(callables):
315         excs = []
316         def wrap(callable):
317             @functools.wraps(callable)
318             def wrapped(*p, **kw):
319                 try:
320                     callable(*p, **kw)
321                 except:
322                     import traceback
323                     traceback.print_exc(file=sys.stderr)
324                     excs.append(sys.exc_info())
325             return wrapped
326         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
327         for thread in threads:
328             thread.start()
329         for thread in threads:
330             thread.join()
331         for exc in excs:
332             eTyp, eVal, eLoc = exc
333             raise eTyp, eVal, eLoc
334
335     def start(self):
336         self._started_time = time.time() 
337         self._start()
338
339     def _start(self, recover = False):
340         parser = XmlExperimentParser()
341         
342         if recover:
343             xml = self._experiment_execute_xml
344         else:
345             xml = self._experiment_design_xml
346         data = parser.from_xml_to_data(xml)
347
348         # instantiate testbed controllers
349         to_recover, to_restart = self._init_testbed_controllers(data, recover)
350         all_restart = set(to_restart)
351         
352         if not recover:
353             # persist testbed connection data, for potential recovery
354             self._persist_testbed_proxies()
355         else:
356             # recover recoverable controllers
357             for guid in to_recover:
358                 try:
359                     self._testbeds[guid].do_setup()
360                     self._testbeds[guid].recover()
361                 except:
362                     # Mark failed
363                     self._failed_testbeds.add(guid)
364     
365         def steps_to_configure(self, allowed_guids):
366             # perform setup in parallel for all test beds,
367             # wait for all threads to finish
368             self._parallel([testbed.do_setup 
369                             for guid,testbed in self._testbeds.iteritems()
370                             if guid in allowed_guids])
371        
372             # perform create-connect in parallel, wait
373             # (internal connections only)
374             self._parallel([testbed.do_create
375                             for guid,testbed in self._testbeds.iteritems()
376                             if guid in allowed_guids])
377
378             self._parallel([testbed.do_connect_init
379                             for guid,testbed in self._testbeds.iteritems()
380                             if guid in allowed_guids])
381
382             self._parallel([testbed.do_connect_compl
383                             for guid,testbed in self._testbeds.iteritems()
384                             if guid in allowed_guids])
385
386             self._parallel([testbed.do_preconfigure
387                             for guid,testbed in self._testbeds.iteritems()
388                             if guid in allowed_guids])
389             self._clear_caches()
390
391         steps_to_configure(self, to_restart)
392
393         if self._netreffed_testbeds:
394             # initally resolve netrefs
395             self.do_netrefs(data, fail_if_undefined=False)
396             
397             # rinse and repeat, for netreffed testbeds
398             netreffed_testbeds = set(self._netreffed_testbeds)
399
400             to_recover, to_restart = self._init_testbed_controllers(data, recover)
401             all_restart.update(to_restart)
402             
403             if not recover:
404                 # persist testbed connection data, for potential recovery
405                 self._persist_testbed_proxies()
406             else:
407                 # recover recoverable controllers
408                 for guid in to_recover:
409                     try:
410                         self._testbeds[guid].do_setup()
411                         self._testbeds[guid].recover()
412                     except:
413                         # Mark failed
414                         self._failed_testbeds.add(guid)
415
416             # configure dependant testbeds
417             steps_to_configure(self, to_restart)
418         
419         all_restart = [ self._testbeds[guid] for guid in all_restart ]
420             
421         # final netref step, fail if anything's left unresolved
422         self.do_netrefs(data, fail_if_undefined=False)
423        
424         # Only now, that netref dependencies have been solve, it is safe to
425         # program cross_connections
426         self._program_testbed_cross_connections(data)
427  
428         # perform do_configure in parallel for al testbeds
429         # (it's internal configuration for each)
430         self._parallel([testbed.do_configure
431                         for testbed in all_restart])
432
433         self._clear_caches()
434
435         #print >>sys.stderr, "DO IT"
436         #import time
437         #time.sleep(60)
438         
439         # cross-connect (cannot be done in parallel)
440         for guid, testbed in self._testbeds.iteritems():
441             cross_data = self._get_cross_data(guid)
442             testbed.do_cross_connect_init(cross_data)
443         for guid, testbed in self._testbeds.iteritems():
444             cross_data = self._get_cross_data(guid)
445             testbed.do_cross_connect_compl(cross_data)
446        
447         self._clear_caches()
448
449         # Last chance to configure (parallel on all testbeds)
450         self._parallel([testbed.do_prestart
451                         for testbed in all_restart])
452
453         # final netref step, fail if anything's left unresolved
454         self.do_netrefs(data, fail_if_undefined=True)
455  
456         self._clear_caches()
457         
458         if not recover:
459             # update execution xml with execution-specific values
460             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
461             self._update_execute_xml()
462             self.persist_execute_xml()
463
464         # start experiment (parallel start on all testbeds)
465         self._parallel([testbed.start
466                         for testbed in all_restart])
467
468         self._clear_caches()
469
470     def _clear_caches(self):
471         # Cleaning cache for safety.
472         self._guids_in_testbed_cache = dict()
473
474     def _persist_testbed_proxies(self):
475         TRANSIENT = (DC.RECOVER,)
476         
477         # persist access configuration for all testbeds, so that
478         # recovery mode can reconnect to them if it becomes necessary
479         conf = ConfigParser.RawConfigParser()
480         for testbed_guid, testbed_config in self._deployment_config.iteritems():
481             testbed_guid = str(testbed_guid)
482             conf.add_section(testbed_guid)
483             for attr in testbed_config.get_attribute_list():
484                 if attr not in TRANSIENT:
485                     conf.set(testbed_guid, attr, 
486                         testbed_config.get_attribute_value(attr))
487         
488         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
489         conf.write(f)
490         f.close()
491     
492     def _load_testbed_proxies(self):
493         TYPEMAP = {
494             Attribute.STRING : 'get',
495             Attribute.BOOL : 'getboolean',
496             Attribute.ENUM : 'get',
497             Attribute.DOUBLE : 'getfloat',
498             Attribute.INTEGER : 'getint',
499         }
500         
501         TRANSIENT = (DC.RECOVER,)
502         
503         # deferred import because proxy needs
504         # our class definitions to define proxies
505         import nepi.util.proxy as proxy
506         
507         conf = ConfigParser.RawConfigParser()
508         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
509         for testbed_guid in conf.sections():
510             testbed_config = proxy.AccessConfiguration()
511             testbed_guid = str(testbed_guid)
512             for attr in testbed_config.get_attribute_list():
513                 if attr not in TRANSIENT:
514                     getter = getattr(conf, TYPEMAP.get(
515                         testbed_config.get_attribute_type(attr),
516                         'get') )
517                     testbed_config.set_attribute_value(
518                         attr, getter(testbed_guid, attr))
519     
520     def _unpersist_testbed_proxies(self):
521         try:
522             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
523         except:
524             # Just print exceptions, this is just cleanup
525             import traceback
526             ######## BUG ##########
527             #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
528             #traceback.print_exc(file=sys.stderr)
529
530     def _update_execute_xml(self):
531         # For all testbeds,
532         #   For all elements in testbed,
533         #       - gather immutable execute-readable attribuets lists
534         #         asynchronously
535         # Generate new design description from design xml
536         # (Wait for attributes lists - implicit syncpoint)
537         # For all testbeds,
538         #   For all elements in testbed,
539         #       - gather all immutable execute-readable attribute
540         #         values, asynchronously
541         # (Wait for attribute values - implicit syncpoint)
542         # For all testbeds,
543         #   For all elements in testbed,
544         #       - inject non-None values into new design
545         # Generate execute xml from new design
546
547         attribute_lists = dict(
548             (testbed_guid, collections.defaultdict(dict))
549             for testbed_guid in self._testbeds
550         )
551         
552         for testbed_guid, testbed in self._testbeds.iteritems():
553             guids = self._guids_in_testbed(testbed_guid)
554             for guid in guids:
555                 attribute_lists[testbed_guid][guid] = \
556                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
557         
558         parser = XmlExperimentParser()
559         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
560
561         attribute_values = dict(
562             (testbed_guid, collections.defaultdict(dict))
563             for testbed_guid in self._testbeds
564         )
565         
566         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
567             testbed = self._testbeds[testbed_guid]
568             for guid, attribute_list in testbed_attribute_lists.iteritems():
569                 attribute_list = _undefer(attribute_list)
570                 attribute_values[testbed_guid][guid] = dict(
571                     (attribute, testbed.get_deferred(guid, attribute))
572                     for attribute in attribute_list
573                 )
574         
575         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
576             for guid, attribute_values in testbed_attribute_values.iteritems():
577                 for attribute, value in attribute_values.iteritems():
578                     value = _undefer(value)
579                     if value is not None:
580                         execute_data.add_attribute_data(guid, attribute, value)
581         
582         self._experiment_execute_xml = parser.to_xml(data=execute_data)
583
584     def stop(self):
585        for testbed in self._testbeds.values():
586            testbed.stop()
587        self._unpersist_testbed_proxies()
588        self._stopped_time = time.time() 
589    
590     def recover(self):
591         # reload perviously persisted testbed access configurations
592         self._failed_testbeds.clear()
593         self._load_testbed_proxies()
594
595         # re-program testbeds that need recovery
596         self._start(recover = True)
597
598     def is_finished(self, guid):
599         testbed = self._testbed_for_guid(guid)
600         if testbed != None:
601             return testbed.status(guid) == AS.STATUS_FINISHED
602         raise RuntimeError("No element exists with guid %d" % guid)    
603     
604     def _testbed_recovery_policy(self, guid, data = None):
605         if data is None:
606             parser = XmlExperimentParser()
607             data = parser.from_xml_to_data(self._experiment_design_xml)
608         
609         return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
610
611     def status(self, guid):
612         if guid in self._testbeds:
613             # guid is a testbed
614             # report testbed status
615             if guid in self._failed_testbeds:
616                 return TS.STATUS_FAILED
617             else:
618                 try:
619                     return self._testbeds[guid].status()
620                 except:
621                     return TS.STATUS_UNRESPONSIVE
622         else:
623             # guid is an element
624             testbed = self._testbed_for_guid(guid)
625             if testbed is not None:
626                 return testbed.status(guid)
627             else:
628                 return AS.STATUS_UNDETERMINED
629
630     def set(self, guid, name, value, time = TIME_NOW):
631         testbed = self._testbed_for_guid(guid)
632         if testbed != None:
633             testbed.set(guid, name, value, time)
634         else:
635             raise RuntimeError("No element exists with guid %d" % guid)    
636
637     def get(self, guid, name, time = TIME_NOW):
638         testbed = self._testbed_for_guid(guid)
639         if testbed != None:
640             return testbed.get(guid, name, time)
641         raise RuntimeError("No element exists with guid %d" % guid)    
642
643     def get_deferred(self, guid, name, time = TIME_NOW):
644         testbed = self._testbed_for_guid(guid)
645         if testbed != None:
646             return testbed.get_deferred(guid, name, time)
647         raise RuntimeError("No element exists with guid %d" % guid)    
648
649     def get_factory_id(self, guid):
650         testbed = self._testbed_for_guid(guid)
651         if testbed != None:
652             return testbed.get_factory_id(guid)
653         raise RuntimeError("No element exists with guid %d" % guid)    
654
655     def get_testbed_id(self, guid):
656         testbed = self._testbed_for_guid(guid)
657         if testbed != None:
658             return testbed.testbed_id
659         raise RuntimeError("No element exists with guid %d" % guid)    
660
661     def get_testbed_version(self, guid):
662         testbed = self._testbed_for_guid(guid)
663         if testbed != None:
664             return testbed.testbed_version
665         raise RuntimeError("No element exists with guid %d" % guid)    
666
667     def shutdown(self):
668         exceptions = list()
669         for testbed in self._testbeds.values():
670             try:
671                 testbed.shutdown()
672             except:
673                 exceptions.append(sys.exc_info())
674         for exc_info in exceptions:
675             raise exc_info[0], exc_info[1], exc_info[2]
676
677     def _testbed_for_guid(self, guid):
678         for testbed_guid in self._testbeds.keys():
679             if guid in self._guids_in_testbed(testbed_guid):
680                 if testbed_guid in self._failed_testbeds:
681                     return None
682                 return self._testbeds[testbed_guid]
683         return None
684
685     def _guids_in_testbed(self, testbed_guid):
686         if testbed_guid not in self._testbeds:
687             return set()
688         if testbed_guid not in self._guids_in_testbed_cache:
689             self._guids_in_testbed_cache[testbed_guid] = \
690                 set(self._testbeds[testbed_guid].guids)
691         return self._guids_in_testbed_cache[testbed_guid]
692
693     @staticmethod
694     def _netref_component_split(component):
695         match = COMPONENT_PATTERN.match(component)
696         if match:
697             return match.group("kind"), match.group("index")
698         else:
699             return component, None
700
701     _NETREF_COMPONENT_GETTERS = {
702         'addr':
703             lambda testbed, guid, index, name: 
704                 testbed.get_address(guid, int(index), name),
705         'route' :
706             lambda testbed, guid, index, name: 
707                 testbed.get_route(guid, int(index), name),
708         'trace' :
709             lambda testbed, guid, index, name: 
710                 testbed.trace(guid, index, name),
711         '' : 
712             lambda testbed, guid, index, name: 
713                 testbed.get(guid, name),
714     }
715     
716     def resolve_netref_value(self, value, failval = None):
717         rv = failval
718         while True:
719             for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
720                 label = match.group("label")
721                 if label.startswith('GUID-'):
722                     ref_guid = int(label[5:])
723                     if ref_guid:
724                         expr = match.group("expr")
725                         component = (match.group("component") or "")[1:] # skip the dot
726                         attribute = match.group("attribute")
727                         
728                         # split compound components into component kind and index
729                         # eg: 'addr[0]' -> ('addr', '0')
730                         component, component_index = self._netref_component_split(component)
731
732                         # find object and resolve expression
733                         for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
734                             if component not in self._NETREF_COMPONENT_GETTERS:
735                                 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
736                             elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
737                                 pass
738                             else:
739                                 ref_value = self._NETREF_COMPONENT_GETTERS[component](
740                                     ref_testbed, ref_guid, component_index, attribute)
741                                 if ref_value:
742                                     value = rv = value.replace(match.group(), ref_value)
743                                     break
744                         else:
745                             # unresolvable netref
746                             return failval
747                         break
748             else:
749                 break
750         return rv
751     
752     def do_netrefs(self, data, fail_if_undefined = False):
753         # element netrefs
754         for (testbed_guid, guid), attrs in self._netrefs.items():
755             testbed = self._testbeds.get(testbed_guid)
756             if testbed is not None:
757                 for name in set(attrs):
758                     value = testbed.get(guid, name)
759                     if isinstance(value, basestring):
760                         ref_value = self.resolve_netref_value(value)
761                         if ref_value is not None:
762                             testbed.set(guid, name, ref_value)
763                             attrs.remove(name)
764                         elif fail_if_undefined:
765                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
766                 if not attrs:
767                     del self._netrefs[(testbed_guid, guid)]
768         
769         # testbed netrefs
770         for testbed_guid, attrs in self._testbed_netrefs.items():
771             tb_data = dict(data.get_attribute_data(testbed_guid))
772             if data:
773                 for name in set(attrs):
774                     value = tb_data.get(name)
775                     if isinstance(value, basestring):
776                         ref_value = self.resolve_netref_value(value)
777                         if ref_value is not None:
778                             data.set_attribute_data(testbed_guid, name, ref_value)
779                             attrs.remove(name)
780                         elif fail_if_undefined:
781                             raise ValueError, "Unresolvable netref in: %r" % (value,)
782                 if not attrs:
783                     del self._testbed_netrefs[testbed_guid]
784         
785
786     def _init_testbed_controllers(self, data, recover = False):
787         blacklist_testbeds = set(self._testbeds)
788         element_guids = list()
789         label_guids = dict()
790         data_guids = data.guids
791         to_recover = set()
792         to_restart = set()
793
794         # gather label associations
795         for guid in data_guids:
796             if not data.is_testbed_data(guid):
797                 (testbed_guid, factory_id) = data.get_box_data(guid)
798                 label = data.get_attribute_data(guid, "label")
799                 if label is not None:
800                     if label in label_guids:
801                         raise RuntimeError, "Label %r is not unique" % (label,)
802                     label_guids[label] = guid
803
804         # create testbed controllers
805         for guid in data_guids:
806             if data.is_testbed_data(guid):
807                 if guid not in self._testbeds:
808                     try:
809                         self._create_testbed_controller(
810                             guid, data, element_guids, recover)
811                         if recover:
812                             # Already programmed
813                             blacklist_testbeds.add(guid)
814                         else:
815                             to_restart.add(guid)
816                     except:
817                         if recover:
818                             policy = self._testbed_recovery_policy(guid, data=data)
819                             if policy == DC.POLICY_RECOVER:
820                                 self._create_testbed_controller(
821                                     guid, data, element_guids, False)
822                                 to_recover.add(guid)
823                             elif policy == DC.POLICY_RESTART:
824                                 self._create_testbed_controller(
825                                     guid, data, element_guids, False)
826                                 to_restart.add(guid)
827                             else:
828                                 # Mark failed
829                                 self._failed_testbeds.add(guid)
830                         else:
831                             raise
832         
833         # queue programmable elements
834         #  - that have not been programmed already (blacklist_testbeds)
835         #  - including recovered or restarted testbeds
836         #  - but those that have no unresolved netrefs
837         for guid in data_guids:
838             if not data.is_testbed_data(guid):
839                 (testbed_guid, factory_id) = data.get_box_data(guid)
840                 if testbed_guid not in blacklist_testbeds:
841                     element_guids.append(guid)
842
843         # replace references to elements labels for its guid
844         self._resolve_labels(data, data_guids, label_guids)
845     
846         # program testbed controllers
847         if element_guids:
848             self._program_testbed_controllers(element_guids, data)
849         
850         return to_recover, to_restart
851
852     def _resolve_labels(self, data, data_guids, label_guids):
853         netrefs = self._netrefs
854         testbed_netrefs = self._testbed_netrefs
855         for guid in data_guids:
856             for name, value in data.get_attribute_data(guid):
857                 if isinstance(value, basestring):
858                     while True:
859                         for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
860                             label = match.group("label")
861                             if not label.startswith('GUID-'):
862                                 ref_guid = label_guids.get(label)
863                                 if ref_guid is not None:
864                                     value = value.replace(
865                                         match.group(),
866                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
867                                             guid = 'GUID-%d' % (ref_guid,),
868                                             expr = match.group("expr"),
869                                             label = label)
870                                     )
871                                     data.set_attribute_data(guid, name, value)
872                                     
873                                     # memorize which guid-attribute pairs require
874                                     # postprocessing, to avoid excessive controller-testbed
875                                     # communication at configuration time
876                                     # (which could require high-latency network I/O)
877                                     if not data.is_testbed_data(guid):
878                                         (testbed_guid, factory_id) = data.get_box_data(guid)
879                                         netrefs[(testbed_guid, guid)].add(name)
880                                     else:
881                                         testbed_netrefs[guid].add(name)
882                                     
883                                     break
884                         else:
885                             break
886
887     def _create_testbed_controller(self, guid, data, element_guids, recover):
888         (testbed_id, testbed_version) = data.get_testbed_data(guid)
889         deployment_config = self._deployment_config.get(guid)
890         
891         # deferred import because proxy needs
892         # our class definitions to define proxies
893         import nepi.util.proxy as proxy
894         
895         if deployment_config is None:
896             # need to create one
897             deployment_config = proxy.AccessConfiguration()
898             
899             for (name, value) in data.get_attribute_data(guid):
900                 if value is not None and deployment_config.has_attribute(name):
901                     # if any deployment config attribute has a netref, we can't
902                     # create this controller yet
903                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
904                         # remember to re-issue this one
905                         self._netreffed_testbeds.add(guid)
906                         return
907                     
908                     # copy deployment config attribute
909                     deployment_config.set_attribute_value(name, value)
910             
911             # commit config
912             self._deployment_config[guid] = deployment_config
913         
914         if deployment_config is not None:
915             # force recovery mode 
916             deployment_config.set_attribute_value("recover",recover)
917         
918         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
919                 deployment_config)
920         for (name, value) in data.get_attribute_data(guid):
921             testbed.defer_configure(name, value)
922         self._testbeds[guid] = testbed
923         if guid in self._netreffed_testbeds:
924             self._netreffed_testbeds.remove(guid)
925
926     def _program_testbed_controllers(self, element_guids, data):
927         def resolve_create_netref(data, guid, name, value): 
928             # Try to resolve create-time netrefs, if possible
929             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
930                 try:
931                     nuvalue = self.resolve_netref_value(value)
932                 except:
933                     # Any trouble means we're not in shape to resolve the netref yet
934                     nuvalue = None
935                 if nuvalue is not None:
936                     # Only if we succeed we remove the netref deferral entry
937                     value = nuvalue
938                     data.set_attribute_data(guid, name, value)
939                     if (testbed_guid, guid) in self._netrefs:
940                         self._netrefs[(testbed_guid, guid)].discard(name)
941             return value
942
943         for guid in element_guids:
944             (testbed_guid, factory_id) = data.get_box_data(guid)
945             testbed = self._testbeds.get(testbed_guid)
946             if testbed is not None:
947                 # create
948                 testbed.defer_create(guid, factory_id)
949                 # set attributes
950                 for (name, value) in data.get_attribute_data(guid):
951                     value = resolve_create_netref(data, guid, name, value)
952                     testbed.defer_create_set(guid, name, value)
953
954         for guid in element_guids:
955             (testbed_guid, factory_id) = data.get_box_data(guid)
956             testbed = self._testbeds.get(testbed_guid)
957             if testbed is not None:
958                 # traces
959                 for trace_id in data.get_trace_data(guid):
960                     testbed.defer_add_trace(guid, trace_id)
961                 # addresses
962                 for (address, netprefix, broadcast) in data.get_address_data(guid):
963                     if address != None:
964                         testbed.defer_add_address(guid, address, netprefix, 
965                                 broadcast)
966                 # routes
967                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
968                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
969                 # store connections data
970                 for (connector_type_name, other_guid, other_connector_type_name) \
971                         in data.get_connection_data(guid):
972                     (other_testbed_guid, other_factory_id) = data.get_box_data(
973                             other_guid)
974                     if testbed_guid == other_testbed_guid:
975                         # each testbed should take care of enforcing internal
976                         # connection simmetry, so each connection is only
977                         # added in one direction
978                         testbed.defer_connect(guid, connector_type_name, 
979                                 other_guid, other_connector_type_name)
980
981     def _program_testbed_cross_connections(self, data):
982         data_guids = data.guids
983         for guid in data_guids: 
984             if not data.is_testbed_data(guid):
985                 (testbed_guid, factory_id) = data.get_box_data(guid)
986                 testbed = self._testbeds.get(testbed_guid)
987                 if testbed is not None:
988                     for (connector_type_name, cross_guid, cross_connector_type_name) \
989                             in data.get_connection_data(guid):
990                         (testbed_guid, factory_id) = data.get_box_data(guid)
991                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
992                                 cross_guid)
993                         if testbed_guid != cross_testbed_guid:
994                             cross_testbed = self._testbeds[cross_testbed_guid]
995                             cross_testbed_id = cross_testbed.testbed_id
996                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
997                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
998                                     cross_connector_type_name)
999                             # save cross data for later
1000                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1001                                     cross_guid)
1002
1003     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1004         if testbed_guid not in self._cross_data:
1005             self._cross_data[testbed_guid] = dict()
1006         if cross_testbed_guid not in self._cross_data[testbed_guid]:
1007             self._cross_data[testbed_guid][cross_testbed_guid] = set()
1008         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1009
1010     def _get_cross_data(self, testbed_guid):
1011         cross_data = dict()
1012         if not testbed_guid in self._cross_data:
1013             return cross_data
1014
1015         # fetch attribute lists in one batch
1016         attribute_lists = dict()
1017         for cross_testbed_guid, guid_list in \
1018                 self._cross_data[testbed_guid].iteritems():
1019             cross_testbed = self._testbeds[cross_testbed_guid]
1020             for cross_guid in guid_list:
1021                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1022                     cross_testbed.get_attribute_list_deferred(cross_guid)
1023
1024         # fetch attribute values in another batch
1025         for cross_testbed_guid, guid_list in \
1026                 self._cross_data[testbed_guid].iteritems():
1027             cross_data[cross_testbed_guid] = dict()
1028             cross_testbed = self._testbeds[cross_testbed_guid]
1029             for cross_guid in guid_list:
1030                 elem_cross_data = dict(
1031                     _guid = cross_guid,
1032                     _testbed_guid = cross_testbed_guid,
1033                     _testbed_id = cross_testbed.testbed_id,
1034                     _testbed_version = cross_testbed.testbed_version)
1035                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1036                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1037                 for attr_name in attribute_list:
1038                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1039                     elem_cross_data[attr_name] = attr_value
1040         
1041         # undefer all values - we'll have to serialize them probably later
1042         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1043             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1044                 for attr_name, attr_value in elem_cross_data.iteritems():
1045                     elem_cross_data[attr_name] = _undefer(attr_value)
1046         
1047         return cross_data
1048