Merge default
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
19
20 def _undefer(deferred):
21     if hasattr(deferred, '_get'):
22         return deferred._get()
23     else:
24         return deferred
25
26
27 class TestbedController(object):
28     def __init__(self, testbed_id, testbed_version):
29         self._testbed_id = testbed_id
30         self._testbed_version = testbed_version
31
32     @property
33     def testbed_id(self):
34         return self._testbed_id
35
36     @property
37     def testbed_version(self):
38         return self._testbed_version
39
40     @property
41     def guids(self):
42         raise NotImplementedError
43
44     def defer_configure(self, name, value):
45         """Instructs setting a configuartion attribute for the testbed instance"""
46         raise NotImplementedError
47
48     def defer_create(self, guid, factory_id):
49         """Instructs creation of element """
50         raise NotImplementedError
51
52     def defer_create_set(self, guid, name, value):
53         """Instructs setting an initial attribute on an element"""
54         raise NotImplementedError
55
56     def defer_factory_set(self, guid, name, value):
57         """Instructs setting an attribute on a factory"""
58         raise NotImplementedError
59
60     def defer_connect(self, guid1, connector_type_name1, guid2, 
61             connector_type_name2): 
62         """Instructs creation of a connection between the given connectors"""
63         raise NotImplementedError
64
65     def defer_cross_connect(self, 
66             guid, connector_type_name,
67             cross_guid, cross_testbed_guid,
68             cross_testbed_id, cross_factory_id,
69             cross_connector_type_name):
70         """
71         Instructs creation of a connection between the given connectors 
72         of different testbed instances
73         """
74         raise NotImplementedError
75
76     def defer_add_trace(self, guid, trace_id):
77         """Instructs the addition of a trace"""
78         raise NotImplementedError
79
80     def defer_add_address(self, guid, address, netprefix, broadcast): 
81         """Instructs the addition of an address"""
82         raise NotImplementedError
83
84     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
85         """Instructs the addition of a route"""
86         raise NotImplementedError
87
88     def do_setup(self):
89         """After do_setup the testbed initial configuration is done"""
90         raise NotImplementedError
91
92     def do_create(self):
93         """
94         After do_create all instructed elements are created and 
95         attributes setted
96         """
97         raise NotImplementedError
98
99     def do_connect_init(self):
100         """
101         After do_connect_init all internal connections between testbed elements
102         are initiated
103         """
104         raise NotImplementedError
105
106     def do_connect_compl(self):
107         """
108         After do_connect all internal connections between testbed elements
109         are completed
110         """
111         raise NotImplementedError
112
113     def do_preconfigure(self):
114         """
115         Done just before resolving netrefs, after connection, before cross connections,
116         useful for early stages of configuration, for setting up stuff that might be
117         required for netref resolution.
118         """
119         raise NotImplementedError
120
121     def do_configure(self):
122         """After do_configure elements are configured"""
123         raise NotImplementedError
124
125     def do_prestart(self):
126         """Before do_start elements are prestart-configured"""
127         raise NotImplementedError
128
129     def do_cross_connect_init(self, cross_data):
130         """
131         After do_cross_connect_init initiation of all external connections 
132         between different testbed elements is performed
133         """
134         raise NotImplementedError
135
136     def do_cross_connect_compl(self, cross_data):
137         """
138         After do_cross_connect_compl completion of all external connections 
139         between different testbed elements is performed
140         """
141         raise NotImplementedError
142
143     def start(self):
144         raise NotImplementedError
145
146     def stop(self):
147         raise NotImplementedError
148
149     def set(self, guid, name, value, time = TIME_NOW):
150         raise NotImplementedError
151
152     def get(self, guid, name, time = TIME_NOW):
153         raise NotImplementedError
154     
155     def get_route(self, guid, index, attribute):
156         """
157         Params:
158             
159             guid: guid of box to query
160             index: number of routing entry to fetch
161             attribute: one of Destination, NextHop, NetPrefix
162         """
163         raise NotImplementedError
164
165     def get_address(self, guid, index, attribute='Address'):
166         """
167         Params:
168             
169             guid: guid of box to query
170             index: number of inteface to select
171             attribute: one of Address, NetPrefix, Broadcast
172         """
173         raise NotImplementedError
174
175     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
176         raise NotImplementedError
177
178     def get_factory_id(self, guid):
179         raise NotImplementedError
180
181     def action(self, time, guid, action):
182         raise NotImplementedError
183
184     def status(self, guid):
185         raise NotImplementedError
186
187     def trace(self, guid, trace_id, attribute='value'):
188         raise NotImplementedError
189
190     def traces_info(self):
191         """ dictionary of dictionaries:
192             traces_info = dict({
193                 guid = dict({
194                     trace_id = dict({
195                             host = host,
196                             filepath = filepath,
197                             filesize = size in bytes,
198                         })
199                 })
200             })"""
201         raise NotImplementedError
202
203     def shutdown(self):
204         raise NotImplementedError
205
206 class ExperimentController(object):
207     def __init__(self, experiment_xml, root_dir):
208         self._experiment_design_xml = experiment_xml
209         self._experiment_execute_xml = None
210         self._testbeds = dict()
211         self._deployment_config = dict()
212         self._netrefs = collections.defaultdict(set)
213         self._testbed_netrefs = collections.defaultdict(set)
214         self._cross_data = dict()
215         self._root_dir = root_dir
216         self._netreffed_testbeds = set()
217         self._guids_in_testbed_cache = dict()
218         
219         if experiment_xml is None and root_dir is not None:
220             # Recover
221             self.load_experiment_xml()
222             self.load_execute_xml()
223         else:
224             self.persist_experiment_xml()
225
226     @property
227     def experiment_design_xml(self):
228         return self._experiment_design_xml
229
230     @property
231     def experiment_execute_xml(self):
232         return self._experiment_execute_xml
233
234     @property
235     def guids(self):
236         guids = list()
237         for testbed_guid in self._testbeds.keys():
238             _guids = self._guids_in_testbed(testbed_guid)
239             if _guids:
240                 guids.extend(_guids)
241         return guids
242
243     def persist_experiment_xml(self):
244         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
245         f = open(xml_path, "w")
246         f.write(self._experiment_design_xml)
247         f.close()
248
249     def persist_execute_xml(self):
250         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
251         f = open(xml_path, "w")
252         f.write(self._experiment_execute_xml)
253         f.close()
254
255     def load_experiment_xml(self):
256         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
257         f = open(xml_path, "r")
258         self._experiment_design_xml = f.read()
259         f.close()
260
261     def load_execute_xml(self):
262         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
263         f = open(xml_path, "r")
264         self._experiment_execute_xml = f.read()
265         f.close()
266
267     def trace(self, guid, trace_id, attribute='value'):
268         testbed = self._testbed_for_guid(guid)
269         if testbed != None:
270             return testbed.trace(guid, trace_id, attribute)
271         raise RuntimeError("No element exists with guid %d" % guid)    
272
273     def traces_info(self):
274         traces_info = dict()
275         for guid, testbed in self._testbeds.iteritems():
276             tinfo = testbed.traces_info()
277             if tinfo:
278                 traces_info[guid] = testbed.traces_info()
279         return traces_info
280
281     @staticmethod
282     def _parallel(callables):
283         excs = []
284         def wrap(callable):
285             @functools.wraps(callable)
286             def wrapped(*p, **kw):
287                 try:
288                     callable(*p, **kw)
289                 except:
290                     import traceback
291                     traceback.print_exc(file=sys.stderr)
292                     excs.append(sys.exc_info())
293             return wrapped
294         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
295         for thread in threads:
296             thread.start()
297         for thread in threads:
298             thread.join()
299         for exc in excs:
300             eTyp, eVal, eLoc = exc
301             raise eTyp, eVal, eLoc
302
303     def start(self):
304         parser = XmlExperimentParser()
305         data = parser.from_xml_to_data(self._experiment_design_xml)
306
307         # instantiate testbed controllers
308         self._init_testbed_controllers(data)
309         
310         # persist testbed connection data, for potential recovery
311         self._persist_testbed_proxies()
312         
313         def steps_to_configure(self, allowed_guids):
314             # perform setup in parallel for all test beds,
315             # wait for all threads to finish
316             self._parallel([testbed.do_setup 
317                             for guid,testbed in self._testbeds.iteritems()
318                             if guid in allowed_guids])
319        
320             # perform create-connect in parallel, wait
321             # (internal connections only)
322             self._parallel([testbed.do_create
323                             for guid,testbed in self._testbeds.iteritems()
324                             if guid in allowed_guids])
325
326             self._parallel([testbed.do_connect_init
327                             for guid,testbed in self._testbeds.iteritems()
328                             if guid in allowed_guids])
329
330             self._parallel([testbed.do_connect_compl
331                             for guid,testbed in self._testbeds.iteritems()
332                             if guid in allowed_guids])
333
334             self._parallel([testbed.do_preconfigure
335                             for guid,testbed in self._testbeds.iteritems()
336                             if guid in allowed_guids])
337             self._clear_caches()
338
339         steps_to_configure(self, self._testbeds)
340
341         if self._netreffed_testbeds:
342             # initally resolve netrefs
343             self.do_netrefs(data, fail_if_undefined=False)
344             
345             # rinse and repeat, for netreffed testbeds
346             netreffed_testbeds = set(self._netreffed_testbeds)
347
348             self._init_testbed_controllers(data)
349             
350             # persist testbed connection data, for potential recovery
351             self._persist_testbed_proxies()
352
353             # configure dependant testbeds
354             steps_to_configure(self, netreffed_testbeds)
355             
356         # final netref step, fail if anything's left unresolved
357         self.do_netrefs(data, fail_if_undefined=True)
358        
359         # Only now, that netref dependencies have been solve, it is safe to
360         # program cross_connections
361         self._program_testbed_cross_connections(data)
362  
363         # perform do_configure in parallel for al testbeds
364         # (it's internal configuration for each)
365         self._parallel([testbed.do_configure
366                         for testbed in self._testbeds.itervalues()])
367
368         self._clear_caches()
369
370         #print >>sys.stderr, "DO IT"
371         #import time
372         #time.sleep(60)
373         
374         # cross-connect (cannot be done in parallel)
375         for guid, testbed in self._testbeds.iteritems():
376             cross_data = self._get_cross_data(guid)
377             testbed.do_cross_connect_init(cross_data)
378         for guid, testbed in self._testbeds.iteritems():
379             cross_data = self._get_cross_data(guid)
380             testbed.do_cross_connect_compl(cross_data)
381        
382         self._clear_caches()
383
384         # Last chance to configure (parallel on all testbeds)
385         self._parallel([testbed.do_prestart
386                         for testbed in self._testbeds.itervalues()])
387
388         self._clear_caches()
389         
390         # update execution xml with execution-specific values
391         # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
392         self._update_execute_xml()
393         self.persist_execute_xml()
394
395         # start experiment (parallel start on all testbeds)
396         self._parallel([testbed.start
397                         for testbed in self._testbeds.itervalues()])
398
399         self._clear_caches()
400
401     def _clear_caches(self):
402         # Cleaning cache for safety.
403         self._guids_in_testbed_cache = dict()
404
405     def _persist_testbed_proxies(self):
406         TRANSIENT = ('Recover',)
407         
408         # persist access configuration for all testbeds, so that
409         # recovery mode can reconnect to them if it becomes necessary
410         conf = ConfigParser.RawConfigParser()
411         for testbed_guid, testbed_config in self._deployment_config.iteritems():
412             testbed_guid = str(testbed_guid)
413             conf.add_section(testbed_guid)
414             for attr in testbed_config.get_attribute_list():
415                 if attr not in TRANSIENT:
416                     conf.set(testbed_guid, attr, 
417                         testbed_config.get_attribute_value(attr))
418         
419         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
420         conf.write(f)
421         f.close()
422     
423     def _load_testbed_proxies(self):
424         TYPEMAP = {
425             Attribute.STRING : 'get',
426             Attribute.BOOL : 'getboolean',
427             Attribute.ENUM : 'get',
428             Attribute.DOUBLE : 'getfloat',
429             Attribute.INTEGER : 'getint',
430         }
431         
432         TRANSIENT = ('Recover',)
433         
434         # deferred import because proxy needs
435         # our class definitions to define proxies
436         import nepi.util.proxy as proxy
437         
438         conf = ConfigParser.RawConfigParser()
439         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
440         for testbed_guid in conf.sections():
441             testbed_config = proxy.AccessConfiguration()
442             testbed_guid = str(testbed_guid)
443             for attr in testbed_config.get_attribute_list():
444                 if attr not in TRANSIENT:
445                     getter = getattr(conf, TYPEMAP.get(
446                         testbed_config.get_attribute_type(attr),
447                         'get') )
448                     testbed_config.set_attribute_value(
449                         attr, getter(testbed_guid, attr))
450     
451     def _unpersist_testbed_proxies(self):
452         try:
453             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
454         except:
455             # Just print exceptions, this is just cleanup
456             import traceback
457             ######## BUG ##########
458             #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
459             #traceback.print_exc(file=sys.stderr)
460
461     def _update_execute_xml(self):
462         # For all testbeds,
463         #   For all elements in testbed,
464         #       - gather immutable execute-readable attribuets lists
465         #         asynchronously
466         # Generate new design description from design xml
467         # (Wait for attributes lists - implicit syncpoint)
468         # For all testbeds,
469         #   For all elements in testbed,
470         #       - gather all immutable execute-readable attribute
471         #         values, asynchronously
472         # (Wait for attribute values - implicit syncpoint)
473         # For all testbeds,
474         #   For all elements in testbed,
475         #       - inject non-None values into new design
476         # Generate execute xml from new design
477
478         attribute_lists = dict(
479             (testbed_guid, collections.defaultdict(dict))
480             for testbed_guid in self._testbeds
481         )
482         
483         for testbed_guid, testbed in self._testbeds.iteritems():
484             guids = self._guids_in_testbed(testbed_guid)
485             for guid in guids:
486                 attribute_lists[testbed_guid][guid] = \
487                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
488         
489         parser = XmlExperimentParser()
490         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
491
492         attribute_values = dict(
493             (testbed_guid, collections.defaultdict(dict))
494             for testbed_guid in self._testbeds
495         )
496         
497         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
498             testbed = self._testbeds[testbed_guid]
499             for guid, attribute_list in testbed_attribute_lists.iteritems():
500                 attribute_list = _undefer(attribute_list)
501                 attribute_values[testbed_guid][guid] = dict(
502                     (attribute, testbed.get_deferred(guid, attribute))
503                     for attribute in attribute_list
504                 )
505         
506         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
507             for guid, attribute_values in testbed_attribute_values.iteritems():
508                 for attribute, value in attribute_values.iteritems():
509                     value = _undefer(value)
510                     if value is not None:
511                         execute_data.add_attribute_data(guid, attribute, value)
512         
513         self._experiment_execute_xml = parser.to_xml(data=execute_data)
514
515     def stop(self):
516        for testbed in self._testbeds.values():
517            testbed.stop()
518        self._unpersist_testbed_proxies()
519    
520     def recover(self):
521         # reload perviously persisted testbed access configurations
522         self._load_testbed_proxies()
523         
524         # Parse experiment xml
525         parser = XmlExperimentParser()
526         data = parser.from_xml_to_data(self._experiment_design_xml)
527         
528         # recreate testbed proxies by reconnecting only
529         self._init_testbed_controllers(data, recover = True)
530         
531         # another time, for netrefs
532         self._init_testbed_controllers(data, recover = True)
533         
534         print >>sys.stderr, "RECOVERED"
535
536     def is_finished(self, guid):
537         testbed = self._testbed_for_guid(guid)
538         if testbed != None:
539             return testbed.status(guid) == AS.STATUS_FINISHED
540         raise RuntimeError("No element exists with guid %d" % guid)    
541
542     def set(self, guid, name, value, time = TIME_NOW):
543         testbed = self._testbed_for_guid(guid)
544         if testbed != None:
545             testbed.set(guid, name, value, time)
546         else:
547             raise RuntimeError("No element exists with guid %d" % guid)    
548
549     def get(self, guid, name, time = TIME_NOW):
550         testbed = self._testbed_for_guid(guid)
551         if testbed != None:
552             return testbed.get(guid, name, time)
553         raise RuntimeError("No element exists with guid %d" % guid)    
554
555     def get_deferred(self, guid, name, time = TIME_NOW):
556         testbed = self._testbed_for_guid(guid)
557         if testbed != None:
558             return testbed.get_deferred(guid, name, time)
559         raise RuntimeError("No element exists with guid %d" % guid)    
560
561     def get_factory_id(self, guid):
562         testbed = self._testbed_for_guid(guid)
563         if testbed != None:
564             return testbed.get_factory_id(guid)
565         raise RuntimeError("No element exists with guid %d" % guid)    
566
567     def get_testbed_id(self, guid):
568         testbed = self._testbed_for_guid(guid)
569         if testbed != None:
570             return testbed.testbed_id
571         raise RuntimeError("No element exists with guid %d" % guid)    
572
573     def get_testbed_version(self, guid):
574         testbed = self._testbed_for_guid(guid)
575         if testbed != None:
576             return testbed.testbed_version
577         raise RuntimeError("No element exists with guid %d" % guid)    
578
579     def shutdown(self):
580         exceptions = list()
581         for testbed in self._testbeds.values():
582             try:
583                 testbed.shutdown()
584             except:
585                 exceptions.append(sys.exc_info())
586         for exc_info in exceptions:
587             raise exc_info[0], exc_info[1], exc_info[2]
588
589     def _testbed_for_guid(self, guid):
590         for testbed_guid in self._testbeds.keys():
591             if guid in self._guids_in_testbed(testbed_guid):
592                 return self._testbeds[testbed_guid]
593         return None
594
595     def _guids_in_testbed(self, testbed_guid):
596         if testbed_guid not in self._testbeds:
597             return set()
598         if testbed_guid not in self._guids_in_testbed_cache:
599             self._guids_in_testbed_cache[testbed_guid] = \
600                 set(self._testbeds[testbed_guid].guids)
601         return self._guids_in_testbed_cache[testbed_guid]
602
603     @staticmethod
604     def _netref_component_split(component):
605         match = COMPONENT_PATTERN.match(component)
606         if match:
607             return match.group("kind"), match.group("index")
608         else:
609             return component, None
610
611     _NETREF_COMPONENT_GETTERS = {
612         'addr':
613             lambda testbed, guid, index, name: 
614                 testbed.get_address(guid, int(index), name),
615         'route' :
616             lambda testbed, guid, index, name: 
617                 testbed.get_route(guid, int(index), name),
618         'trace' :
619             lambda testbed, guid, index, name: 
620                 testbed.trace(guid, index, name),
621         '' : 
622             lambda testbed, guid, index, name: 
623                 testbed.get(guid, name),
624     }
625     
626     def resolve_netref_value(self, value, failval = None):
627         match = ATTRIBUTE_PATTERN_BASE.search(value)
628         if match:
629             label = match.group("label")
630             if label.startswith('GUID-'):
631                 ref_guid = int(label[5:])
632                 if ref_guid:
633                     expr = match.group("expr")
634                     component = (match.group("component") or "")[1:] # skip the dot
635                     attribute = match.group("attribute")
636                     
637                     # split compound components into component kind and index
638                     # eg: 'addr[0]' -> ('addr', '0')
639                     component, component_index = self._netref_component_split(component)
640
641                     # find object and resolve expression
642                     for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
643                         if component not in self._NETREF_COMPONENT_GETTERS:
644                             raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
645                         elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
646                             pass
647                         else:
648                             ref_value = self._NETREF_COMPONENT_GETTERS[component](
649                                 ref_testbed, ref_guid, component_index, attribute)
650                             if ref_value:
651                                 return value.replace(match.group(), ref_value)
652         # couldn't find value
653         return failval
654     
655     def do_netrefs(self, data, fail_if_undefined = False):
656         # element netrefs
657         for (testbed_guid, guid), attrs in self._netrefs.items():
658             testbed = self._testbeds.get(testbed_guid)
659             if testbed is not None:
660                 for name in set(attrs):
661                     value = testbed.get(guid, name)
662                     if isinstance(value, basestring):
663                         ref_value = self.resolve_netref_value(value)
664                         if ref_value is not None:
665                             testbed.set(guid, name, ref_value)
666                             attrs.remove(name)
667                         elif fail_if_undefined:
668                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
669                 if not attrs:
670                     del self._netrefs[(testbed_guid, guid)]
671         
672         # testbed netrefs
673         for testbed_guid, attrs in self._testbed_netrefs.items():
674             tb_data = dict(data.get_attribute_data(testbed_guid))
675             if data:
676                 for name in set(attrs):
677                     value = tb_data.get(name)
678                     if isinstance(value, basestring):
679                         ref_value = self.resolve_netref_value(value)
680                         if ref_value is not None:
681                             data.set_attribute_data(testbed_guid, name, ref_value)
682                             attrs.remove(name)
683                         elif fail_if_undefined:
684                             raise ValueError, "Unresolvable netref in: %r" % (value,)
685                 if not attrs:
686                     del self._testbed_netrefs[testbed_guid]
687         
688
689     def _init_testbed_controllers(self, data, recover = False):
690         blacklist_testbeds = set(self._testbeds)
691         element_guids = list()
692         label_guids = dict()
693         data_guids = data.guids
694
695         # create testbed controllers
696         for guid in data_guids:
697             if data.is_testbed_data(guid):
698                 if guid not in self._testbeds:
699                     self._create_testbed_controller(guid, data, element_guids,
700                             recover)
701             else:
702                 (testbed_guid, factory_id) = data.get_box_data(guid)
703                 if testbed_guid not in blacklist_testbeds:
704                     element_guids.append(guid)
705                     label = data.get_attribute_data(guid, "label")
706                     if label is not None:
707                         if label in label_guids:
708                             raise RuntimeError, "Label %r is not unique" % (label,)
709                         label_guids[label] = guid
710
711         # replace references to elements labels for its guid
712         self._resolve_labels(data, data_guids, label_guids)
713     
714         # program testbed controllers
715         if not recover:
716             self._program_testbed_controllers(element_guids, data)
717
718     def _resolve_labels(self, data, data_guids, label_guids):
719         netrefs = self._netrefs
720         testbed_netrefs = self._testbed_netrefs
721         for guid in data_guids:
722             for name, value in data.get_attribute_data(guid):
723                 if isinstance(value, basestring):
724                     match = ATTRIBUTE_PATTERN_BASE.search(value)
725                     if match:
726                         label = match.group("label")
727                         if not label.startswith('GUID-'):
728                             ref_guid = label_guids.get(label)
729                             if ref_guid is not None:
730                                 value = ATTRIBUTE_PATTERN_BASE.sub(
731                                     ATTRIBUTE_PATTERN_GUID_SUB % dict(
732                                         guid = 'GUID-%d' % (ref_guid,),
733                                         expr = match.group("expr"),
734                                         label = label), 
735                                     value)
736                                 data.set_attribute_data(guid, name, value)
737                                 
738                                 # memorize which guid-attribute pairs require
739                                 # postprocessing, to avoid excessive controller-testbed
740                                 # communication at configuration time
741                                 # (which could require high-latency network I/O)
742                                 if not data.is_testbed_data(guid):
743                                     (testbed_guid, factory_id) = data.get_box_data(guid)
744                                     netrefs[(testbed_guid, guid)].add(name)
745                                 else:
746                                     testbed_netrefs[guid].add(name)
747
748     def _create_testbed_controller(self, guid, data, element_guids, recover):
749         (testbed_id, testbed_version) = data.get_testbed_data(guid)
750         deployment_config = self._deployment_config.get(guid)
751         
752         # deferred import because proxy needs
753         # our class definitions to define proxies
754         import nepi.util.proxy as proxy
755         
756         if deployment_config is None:
757             # need to create one
758             deployment_config = proxy.AccessConfiguration()
759             
760             for (name, value) in data.get_attribute_data(guid):
761                 if value is not None and deployment_config.has_attribute(name):
762                     # if any deployment config attribute has a netref, we can't
763                     # create this controller yet
764                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
765                         # remember to re-issue this one
766                         self._netreffed_testbeds.add(guid)
767                         return
768                     
769                     # copy deployment config attribute
770                     deployment_config.set_attribute_value(name, value)
771             
772             # commit config
773             self._deployment_config[guid] = deployment_config
774         
775         if deployment_config is not None:
776             # force recovery mode 
777             deployment_config.set_attribute_value("recover",recover)
778         
779         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
780                 deployment_config)
781         for (name, value) in data.get_attribute_data(guid):
782             testbed.defer_configure(name, value)
783         self._testbeds[guid] = testbed
784         if guid in self._netreffed_testbeds:
785             self._netreffed_testbeds.remove(guid)
786
787     def _program_testbed_controllers(self, element_guids, data):
788         def resolve_create_netref(data, guid, name, value): 
789             # Try to resolve create-time netrefs, if possible
790             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
791                 try:
792                     nuvalue = self.resolve_netref_value(value)
793                 except:
794                     # Any trouble means we're not in shape to resolve the netref yet
795                     nuvalue = None
796                 if nuvalue is not None:
797                     # Only if we succeed we remove the netref deferral entry
798                     value = nuvalue
799                     data.set_attribute_data(guid, name, value)
800                     if (testbed_guid, guid) in self._netrefs:
801                         self._netrefs[(testbed_guid, guid)].discard(name)
802             return value
803
804         for guid in element_guids:
805             (testbed_guid, factory_id) = data.get_box_data(guid)
806             testbed = self._testbeds.get(testbed_guid)
807             if testbed is not None:
808                 # create
809                 testbed.defer_create(guid, factory_id)
810                 # set attributes
811                 for (name, value) in data.get_attribute_data(guid):
812                     value = resolve_create_netref(data, guid, name, value)
813                     testbed.defer_create_set(guid, name, value)
814
815         for guid in element_guids:
816             (testbed_guid, factory_id) = data.get_box_data(guid)
817             testbed = self._testbeds.get(testbed_guid)
818             if testbed is not None:
819                 # traces
820                 for trace_id in data.get_trace_data(guid):
821                     testbed.defer_add_trace(guid, trace_id)
822                 # addresses
823                 for (address, netprefix, broadcast) in data.get_address_data(guid):
824                     if address != None:
825                         testbed.defer_add_address(guid, address, netprefix, 
826                                 broadcast)
827                 # routes
828                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
829                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
830                 # store connections data
831                 for (connector_type_name, other_guid, other_connector_type_name) \
832                         in data.get_connection_data(guid):
833                     (other_testbed_guid, other_factory_id) = data.get_box_data(
834                             other_guid)
835                     if testbed_guid == other_testbed_guid:
836                         # each testbed should take care of enforcing internal
837                         # connection simmetry, so each connection is only
838                         # added in one direction
839                         testbed.defer_connect(guid, connector_type_name, 
840                                 other_guid, other_connector_type_name)
841
842     def _program_testbed_cross_connections(self, data):
843         data_guids = data.guids
844         for guid in data_guids: 
845             if not data.is_testbed_data(guid):
846                 (testbed_guid, factory_id) = data.get_box_data(guid)
847                 testbed = self._testbeds.get(testbed_guid)
848                 if testbed is not None:
849                     for (connector_type_name, cross_guid, cross_connector_type_name) \
850                             in data.get_connection_data(guid):
851                         (testbed_guid, factory_id) = data.get_box_data(guid)
852                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
853                                 cross_guid)
854                         if testbed_guid != cross_testbed_guid:
855                             cross_testbed = self._testbeds[cross_testbed_guid]
856                             cross_testbed_id = cross_testbed.testbed_id
857                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
858                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
859                                     cross_connector_type_name)
860                             # save cross data for later
861                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
862                                     cross_guid)
863
864     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
865         if testbed_guid not in self._cross_data:
866             self._cross_data[testbed_guid] = dict()
867         if cross_testbed_guid not in self._cross_data[testbed_guid]:
868             self._cross_data[testbed_guid][cross_testbed_guid] = set()
869         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
870
871     def _get_cross_data(self, testbed_guid):
872         cross_data = dict()
873         if not testbed_guid in self._cross_data:
874             return cross_data
875
876         # fetch attribute lists in one batch
877         attribute_lists = dict()
878         for cross_testbed_guid, guid_list in \
879                 self._cross_data[testbed_guid].iteritems():
880             cross_testbed = self._testbeds[cross_testbed_guid]
881             for cross_guid in guid_list:
882                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
883                     cross_testbed.get_attribute_list_deferred(cross_guid)
884
885         # fetch attribute values in another batch
886         for cross_testbed_guid, guid_list in \
887                 self._cross_data[testbed_guid].iteritems():
888             cross_data[cross_testbed_guid] = dict()
889             cross_testbed = self._testbeds[cross_testbed_guid]
890             for cross_guid in guid_list:
891                 elem_cross_data = dict(
892                     _guid = cross_guid,
893                     _testbed_guid = cross_testbed_guid,
894                     _testbed_id = cross_testbed.testbed_id,
895                     _testbed_version = cross_testbed.testbed_version)
896                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
897                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
898                 for attr_name in attribute_list:
899                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
900                     elem_cross_data[attr_name] = attr_value
901         
902         # undefer all values - we'll have to serialize them probably later
903         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
904             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
905                 for attr_name, attr_value in elem_cross_data.iteritems():
906                     elem_cross_data[attr_name] = _undefer(attr_value)
907         
908         return cross_data
909