Merge with aly's
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15 import time
16 import logging
17
18 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
19 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
20 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
21
22 def _undefer(deferred):
23     if hasattr(deferred, '_get'):
24         return deferred._get()
25     else:
26         return deferred
27
28
29 class TestbedController(object):
30     def __init__(self, testbed_id, testbed_version):
31         self._testbed_id = testbed_id
32         self._testbed_version = testbed_version
33
34     @property
35     def testbed_id(self):
36         return self._testbed_id
37
38     @property
39     def testbed_version(self):
40         return self._testbed_version
41
42     @property
43     def guids(self):
44         raise NotImplementedError
45
46     def defer_configure(self, name, value):
47         """Instructs setting a configuartion attribute for the testbed instance"""
48         raise NotImplementedError
49
50     def defer_create(self, guid, factory_id):
51         """Instructs creation of element """
52         raise NotImplementedError
53
54     def defer_create_set(self, guid, name, value):
55         """Instructs setting an initial attribute on an element"""
56         raise NotImplementedError
57
58     def defer_factory_set(self, guid, name, value):
59         """Instructs setting an attribute on a factory"""
60         raise NotImplementedError
61
62     def defer_connect(self, guid1, connector_type_name1, guid2, 
63             connector_type_name2): 
64         """Instructs creation of a connection between the given connectors"""
65         raise NotImplementedError
66
67     def defer_cross_connect(self, 
68             guid, connector_type_name,
69             cross_guid, cross_testbed_guid,
70             cross_testbed_id, cross_factory_id,
71             cross_connector_type_name):
72         """
73         Instructs creation of a connection between the given connectors 
74         of different testbed instances
75         """
76         raise NotImplementedError
77
78     def defer_add_trace(self, guid, trace_id):
79         """Instructs the addition of a trace"""
80         raise NotImplementedError
81
82     def defer_add_address(self, guid, address, netprefix, broadcast): 
83         """Instructs the addition of an address"""
84         raise NotImplementedError
85
86     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
87         """Instructs the addition of a route"""
88         raise NotImplementedError
89
90     def do_setup(self):
91         """After do_setup the testbed initial configuration is done"""
92         raise NotImplementedError
93
94     def do_create(self):
95         """
96         After do_create all instructed elements are created and 
97         attributes setted
98         """
99         raise NotImplementedError
100
101     def do_connect_init(self):
102         """
103         After do_connect_init all internal connections between testbed elements
104         are initiated
105         """
106         raise NotImplementedError
107
108     def do_connect_compl(self):
109         """
110         After do_connect all internal connections between testbed elements
111         are completed
112         """
113         raise NotImplementedError
114
115     def do_preconfigure(self):
116         """
117         Done just before resolving netrefs, after connection, before cross connections,
118         useful for early stages of configuration, for setting up stuff that might be
119         required for netref resolution.
120         """
121         raise NotImplementedError
122
123     def do_configure(self):
124         """After do_configure elements are configured"""
125         raise NotImplementedError
126
127     def do_prestart(self):
128         """Before do_start elements are prestart-configured"""
129         raise NotImplementedError
130
131     def do_cross_connect_init(self, cross_data):
132         """
133         After do_cross_connect_init initiation of all external connections 
134         between different testbed elements is performed
135         """
136         raise NotImplementedError
137
138     def do_cross_connect_compl(self, cross_data):
139         """
140         After do_cross_connect_compl completion of all external connections 
141         between different testbed elements is performed
142         """
143         raise NotImplementedError
144
145     def start(self):
146         raise NotImplementedError
147
148     def stop(self):
149         raise NotImplementedError
150
151     def recover(self):
152         """
153         On testbed recovery (if recovery is a supported policy), the controller
154         instance will be re-created and the following sequence invoked:
155         
156             do_setup
157             defer_X - programming the testbed with persisted execution values
158                 (not design values). Execution values (ExecImmutable attributes)
159                 should be enough to recreate the testbed's state.
160             *recover*
161             <cross-connection methods>
162             
163         Start will not be called, and after cross connection invocations,
164         the testbed is supposed to be fully functional again.
165         """
166         raise NotImplementedError
167
168     def set(self, guid, name, value, time = TIME_NOW):
169         raise NotImplementedError
170
171     def get(self, guid, name, time = TIME_NOW):
172         raise NotImplementedError
173     
174     def get_route(self, guid, index, attribute):
175         """
176         Params:
177             
178             guid: guid of box to query
179             index: number of routing entry to fetch
180             attribute: one of Destination, NextHop, NetPrefix
181         """
182         raise NotImplementedError
183
184     def get_address(self, guid, index, attribute='Address'):
185         """
186         Params:
187             
188             guid: guid of box to query
189             index: number of inteface to select
190             attribute: one of Address, NetPrefix, Broadcast
191         """
192         raise NotImplementedError
193
194     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
195         raise NotImplementedError
196
197     def get_factory_id(self, guid):
198         raise NotImplementedError
199
200     def action(self, time, guid, action):
201         raise NotImplementedError
202
203     def status(self, guid):
204         raise NotImplementedError
205     
206     def testbed_status(self):
207         raise NotImplementedError
208
209     def trace(self, guid, trace_id, attribute='value'):
210         raise NotImplementedError
211
212     def traces_info(self):
213         """ dictionary of dictionaries:
214             traces_info = dict({
215                 guid = dict({
216                     trace_id = dict({
217                             host = host,
218                             filepath = filepath,
219                             filesize = size in bytes,
220                         })
221                 })
222             })"""
223         raise NotImplementedError
224
225     def shutdown(self):
226         raise NotImplementedError
227
228 class ExperimentController(object):
229     def __init__(self, experiment_xml, root_dir):
230         self._experiment_design_xml = experiment_xml
231         self._experiment_execute_xml = None
232         self._testbeds = dict()
233         self._deployment_config = dict()
234         self._netrefs = collections.defaultdict(set)
235         self._testbed_netrefs = collections.defaultdict(set)
236         self._cross_data = dict()
237         self._root_dir = root_dir
238         self._netreffed_testbeds = set()
239         self._guids_in_testbed_cache = dict()
240         self._failed_testbeds = set()
241         self._started_time = None
242         self._stopped_time = None
243         
244         self._logger = logging.getLogger('nepi.core.execute')
245         
246         if experiment_xml is None and root_dir is not None:
247             # Recover
248             self.load_experiment_xml()
249             self.load_execute_xml()
250         else:
251             self.persist_experiment_xml()
252
253     @property
254     def experiment_design_xml(self):
255         return self._experiment_design_xml
256
257     @property
258     def experiment_execute_xml(self):
259         return self._experiment_execute_xml
260
261     @property
262     def started_time(self):
263         return self._started_time
264
265     @property
266     def stopped_time(self):
267         return self._stopped_time
268
269     @property
270     def guids(self):
271         guids = list()
272         for testbed_guid in self._testbeds.keys():
273             _guids = self._guids_in_testbed(testbed_guid)
274             if _guids:
275                 guids.extend(_guids)
276         return guids
277
278     def persist_experiment_xml(self):
279         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
280         f = open(xml_path, "w")
281         f.write(self._experiment_design_xml)
282         f.close()
283
284     def persist_execute_xml(self):
285         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
286         f = open(xml_path, "w")
287         f.write(self._experiment_execute_xml)
288         f.close()
289
290     def load_experiment_xml(self):
291         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
292         f = open(xml_path, "r")
293         self._experiment_design_xml = f.read()
294         f.close()
295
296     def load_execute_xml(self):
297         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
298         f = open(xml_path, "r")
299         self._experiment_execute_xml = f.read()
300         f.close()
301
302     def trace(self, guid, trace_id, attribute='value'):
303         testbed = self._testbed_for_guid(guid)
304         if testbed != None:
305             return testbed.trace(guid, trace_id, attribute)
306         raise RuntimeError("No element exists with guid %d" % guid)    
307
308     def traces_info(self):
309         traces_info = dict()
310         for guid, testbed in self._testbeds.iteritems():
311             tinfo = testbed.traces_info()
312             if tinfo:
313                 traces_info[guid] = testbed.traces_info()
314         return traces_info
315
316     @staticmethod
317     def _parallel(callables):
318         excs = []
319         def wrap(callable):
320             @functools.wraps(callable)
321             def wrapped(*p, **kw):
322                 try:
323                     callable(*p, **kw)
324                 except:
325                     logging.exception("Exception occurred in asynchronous thread:")
326                     excs.append(sys.exc_info())
327             return wrapped
328         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
329         for thread in threads:
330             thread.start()
331         for thread in threads:
332             thread.join()
333         for exc in excs:
334             eTyp, eVal, eLoc = exc
335             raise eTyp, eVal, eLoc
336
337     def start(self):
338         self._started_time = time.time() 
339         self._start()
340
341     def _start(self, recover = False):
342         parser = XmlExperimentParser()
343         
344         if recover:
345             xml = self._experiment_execute_xml
346         else:
347             xml = self._experiment_design_xml
348         data = parser.from_xml_to_data(xml)
349
350         # instantiate testbed controllers
351         to_recover, to_restart = self._init_testbed_controllers(data, recover)
352         all_restart = set(to_restart)
353         
354         if not recover:
355             # persist testbed connection data, for potential recovery
356             self._persist_testbed_proxies()
357         else:
358             # recover recoverable controllers
359             for guid in to_recover:
360                 try:
361                     self._testbeds[guid].do_setup()
362                     self._testbeds[guid].recover()
363                 except:
364                     self._logger.exception("During recovery of testbed %s", guid)
365                     
366                     # Mark failed
367                     self._failed_testbeds.add(guid)
368     
369         def steps_to_configure(self, allowed_guids):
370             # perform setup in parallel for all test beds,
371             # wait for all threads to finish
372             self._parallel([testbed.do_setup 
373                             for guid,testbed in self._testbeds.iteritems()
374                             if guid in allowed_guids])
375        
376             # perform create-connect in parallel, wait
377             # (internal connections only)
378             self._parallel([testbed.do_create
379                             for guid,testbed in self._testbeds.iteritems()
380                             if guid in allowed_guids])
381
382             self._parallel([testbed.do_connect_init
383                             for guid,testbed in self._testbeds.iteritems()
384                             if guid in allowed_guids])
385
386             self._parallel([testbed.do_connect_compl
387                             for guid,testbed in self._testbeds.iteritems()
388                             if guid in allowed_guids])
389
390             self._parallel([testbed.do_preconfigure
391                             for guid,testbed in self._testbeds.iteritems()
392                             if guid in allowed_guids])
393             self._clear_caches()
394
395         steps_to_configure(self, to_restart)
396
397         if self._netreffed_testbeds:
398             # initally resolve netrefs
399             self.do_netrefs(data, fail_if_undefined=False)
400             
401             # rinse and repeat, for netreffed testbeds
402             netreffed_testbeds = set(self._netreffed_testbeds)
403
404             to_recover, to_restart = self._init_testbed_controllers(data, recover)
405             all_restart.update(to_restart)
406             
407             if not recover:
408                 # persist testbed connection data, for potential recovery
409                 self._persist_testbed_proxies()
410             else:
411                 # recover recoverable controllers
412                 for guid in to_recover:
413                     try:
414                         self._testbeds[guid].do_setup()
415                         self._testbeds[guid].recover()
416                     except:
417                         self._logger.exception("During recovery of testbed %s", guid)
418
419                         # Mark failed
420                         self._failed_testbeds.add(guid)
421
422             # configure dependant testbeds
423             steps_to_configure(self, to_restart)
424         
425         all_restart = [ self._testbeds[guid] for guid in all_restart ]
426             
427         # final netref step, fail if anything's left unresolved
428         self.do_netrefs(data, fail_if_undefined=False)
429        
430         # Only now, that netref dependencies have been solve, it is safe to
431         # program cross_connections
432         self._program_testbed_cross_connections(data)
433  
434         # perform do_configure in parallel for al testbeds
435         # (it's internal configuration for each)
436         self._parallel([testbed.do_configure
437                         for testbed in all_restart])
438
439         self._clear_caches()
440
441         #print >>sys.stderr, "DO IT"
442         #import time
443         #time.sleep(60)
444         
445         # cross-connect (cannot be done in parallel)
446         for guid, testbed in self._testbeds.iteritems():
447             cross_data = self._get_cross_data(guid)
448             testbed.do_cross_connect_init(cross_data)
449         for guid, testbed in self._testbeds.iteritems():
450             cross_data = self._get_cross_data(guid)
451             testbed.do_cross_connect_compl(cross_data)
452        
453         self._clear_caches()
454
455         # Last chance to configure (parallel on all testbeds)
456         self._parallel([testbed.do_prestart
457                         for testbed in all_restart])
458
459         # final netref step, fail if anything's left unresolved
460         self.do_netrefs(data, fail_if_undefined=True)
461  
462         self._clear_caches()
463         
464         if not recover:
465             # update execution xml with execution-specific values
466             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
467             self._update_execute_xml()
468             self.persist_execute_xml()
469
470         # start experiment (parallel start on all testbeds)
471         self._parallel([testbed.start
472                         for testbed in all_restart])
473
474         self._clear_caches()
475
476     def _clear_caches(self):
477         # Cleaning cache for safety.
478         self._guids_in_testbed_cache = dict()
479
480     def _persist_testbed_proxies(self):
481         TRANSIENT = (DC.RECOVER,)
482         
483         # persist access configuration for all testbeds, so that
484         # recovery mode can reconnect to them if it becomes necessary
485         conf = ConfigParser.RawConfigParser()
486         for testbed_guid, testbed_config in self._deployment_config.iteritems():
487             testbed_guid = str(testbed_guid)
488             conf.add_section(testbed_guid)
489             for attr in testbed_config.get_attribute_list():
490                 if attr not in TRANSIENT:
491                     conf.set(testbed_guid, attr, 
492                         testbed_config.get_attribute_value(attr))
493         
494         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
495         conf.write(f)
496         f.close()
497     
498     def _load_testbed_proxies(self):
499         TYPEMAP = {
500             Attribute.STRING : 'get',
501             Attribute.BOOL : 'getboolean',
502             Attribute.ENUM : 'get',
503             Attribute.DOUBLE : 'getfloat',
504             Attribute.INTEGER : 'getint',
505         }
506         
507         TRANSIENT = (DC.RECOVER,)
508         
509         # deferred import because proxy needs
510         # our class definitions to define proxies
511         import nepi.util.proxy as proxy
512         
513         conf = ConfigParser.RawConfigParser()
514         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
515         for testbed_guid in conf.sections():
516             testbed_config = proxy.AccessConfiguration()
517             testbed_guid = str(testbed_guid)
518             for attr in testbed_config.get_attribute_list():
519                 if attr not in TRANSIENT:
520                     getter = getattr(conf, TYPEMAP.get(
521                         testbed_config.get_attribute_type(attr),
522                         'get') )
523                     testbed_config.set_attribute_value(
524                         attr, getter(testbed_guid, attr))
525     
526     def _unpersist_testbed_proxies(self):
527         try:
528             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
529         except:
530             # Just print exceptions, this is just cleanup
531             self._logger.exception("Loading testbed configuration")
532
533     def _update_execute_xml(self):
534         # For all testbeds,
535         #   For all elements in testbed,
536         #       - gather immutable execute-readable attribuets lists
537         #         asynchronously
538         # Generate new design description from design xml
539         # (Wait for attributes lists - implicit syncpoint)
540         # For all testbeds,
541         #   For all elements in testbed,
542         #       - gather all immutable execute-readable attribute
543         #         values, asynchronously
544         # (Wait for attribute values - implicit syncpoint)
545         # For all testbeds,
546         #   For all elements in testbed,
547         #       - inject non-None values into new design
548         # Generate execute xml from new design
549
550         attribute_lists = dict(
551             (testbed_guid, collections.defaultdict(dict))
552             for testbed_guid in self._testbeds
553         )
554         
555         for testbed_guid, testbed in self._testbeds.iteritems():
556             guids = self._guids_in_testbed(testbed_guid)
557             for guid in guids:
558                 attribute_lists[testbed_guid][guid] = \
559                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
560         
561         parser = XmlExperimentParser()
562         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
563
564         attribute_values = dict(
565             (testbed_guid, collections.defaultdict(dict))
566             for testbed_guid in self._testbeds
567         )
568         
569         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
570             testbed = self._testbeds[testbed_guid]
571             for guid, attribute_list in testbed_attribute_lists.iteritems():
572                 attribute_list = _undefer(attribute_list)
573                 attribute_values[testbed_guid][guid] = dict(
574                     (attribute, testbed.get_deferred(guid, attribute))
575                     for attribute in attribute_list
576                 )
577         
578         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
579             for guid, attribute_values in testbed_attribute_values.iteritems():
580                 for attribute, value in attribute_values.iteritems():
581                     value = _undefer(value)
582                     if value is not None:
583                         execute_data.add_attribute_data(guid, attribute, value)
584         
585         self._experiment_execute_xml = parser.to_xml(data=execute_data)
586
587     def stop(self):
588        for testbed in self._testbeds.values():
589            testbed.stop()
590        self._unpersist_testbed_proxies()
591        self._stopped_time = time.time() 
592    
593     def recover(self):
594         # reload perviously persisted testbed access configurations
595         self._failed_testbeds.clear()
596         self._load_testbed_proxies()
597
598         # re-program testbeds that need recovery
599         self._start(recover = True)
600
601     def is_finished(self, guid):
602         testbed = self._testbed_for_guid(guid)
603         if testbed != None:
604             return testbed.status(guid) == AS.STATUS_FINISHED
605         raise RuntimeError("No element exists with guid %d" % guid)    
606     
607     def _testbed_recovery_policy(self, guid, data = None):
608         if data is None:
609             parser = XmlExperimentParser()
610             data = parser.from_xml_to_data(self._experiment_design_xml)
611         
612         return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
613
614     def status(self, guid):
615         if guid in self._testbeds:
616             # guid is a testbed
617             # report testbed status
618             if guid in self._failed_testbeds:
619                 return TS.STATUS_FAILED
620             else:
621                 try:
622                     return self._testbeds[guid].status()
623                 except:
624                     return TS.STATUS_UNRESPONSIVE
625         else:
626             # guid is an element
627             testbed = self._testbed_for_guid(guid)
628             if testbed is not None:
629                 return testbed.status(guid)
630             else:
631                 return AS.STATUS_UNDETERMINED
632
633     def set(self, guid, name, value, time = TIME_NOW):
634         testbed = self._testbed_for_guid(guid)
635         if testbed != None:
636             testbed.set(guid, name, value, time)
637         else:
638             raise RuntimeError("No element exists with guid %d" % guid)    
639
640     def get(self, guid, name, time = TIME_NOW):
641         testbed = self._testbed_for_guid(guid)
642         if testbed != None:
643             return testbed.get(guid, name, time)
644         raise RuntimeError("No element exists with guid %d" % guid)    
645
646     def get_deferred(self, guid, name, time = TIME_NOW):
647         testbed = self._testbed_for_guid(guid)
648         if testbed != None:
649             return testbed.get_deferred(guid, name, time)
650         raise RuntimeError("No element exists with guid %d" % guid)    
651
652     def get_factory_id(self, guid):
653         testbed = self._testbed_for_guid(guid)
654         if testbed != None:
655             return testbed.get_factory_id(guid)
656         raise RuntimeError("No element exists with guid %d" % guid)    
657
658     def get_testbed_id(self, guid):
659         testbed = self._testbed_for_guid(guid)
660         if testbed != None:
661             return testbed.testbed_id
662         raise RuntimeError("No element exists with guid %d" % guid)    
663
664     def get_testbed_version(self, guid):
665         testbed = self._testbed_for_guid(guid)
666         if testbed != None:
667             return testbed.testbed_version
668         raise RuntimeError("No element exists with guid %d" % guid)    
669
670     def shutdown(self):
671         exceptions = list()
672         for testbed in self._testbeds.values():
673             try:
674                 testbed.shutdown()
675             except:
676                 exceptions.append(sys.exc_info())
677         for exc_info in exceptions:
678             raise exc_info[0], exc_info[1], exc_info[2]
679
680     def _testbed_for_guid(self, guid):
681         for testbed_guid in self._testbeds.keys():
682             if guid in self._guids_in_testbed(testbed_guid):
683                 if testbed_guid in self._failed_testbeds:
684                     return None
685                 return self._testbeds[testbed_guid]
686         return None
687
688     def _guids_in_testbed(self, testbed_guid):
689         if testbed_guid not in self._testbeds:
690             return set()
691         if testbed_guid not in self._guids_in_testbed_cache:
692             self._guids_in_testbed_cache[testbed_guid] = \
693                 set(self._testbeds[testbed_guid].guids)
694         return self._guids_in_testbed_cache[testbed_guid]
695
696     @staticmethod
697     def _netref_component_split(component):
698         match = COMPONENT_PATTERN.match(component)
699         if match:
700             return match.group("kind"), match.group("index")
701         else:
702             return component, None
703
704     _NETREF_COMPONENT_GETTERS = {
705         'addr':
706             lambda testbed, guid, index, name: 
707                 testbed.get_address(guid, int(index), name),
708         'route' :
709             lambda testbed, guid, index, name: 
710                 testbed.get_route(guid, int(index), name),
711         'trace' :
712             lambda testbed, guid, index, name: 
713                 testbed.trace(guid, index, name),
714         '' : 
715             lambda testbed, guid, index, name: 
716                 testbed.get(guid, name),
717     }
718     
719     def resolve_netref_value(self, value, failval = None):
720         rv = failval
721         while True:
722             for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
723                 label = match.group("label")
724                 if label.startswith('GUID-'):
725                     ref_guid = int(label[5:])
726                     if ref_guid:
727                         expr = match.group("expr")
728                         component = (match.group("component") or "")[1:] # skip the dot
729                         attribute = match.group("attribute")
730                         
731                         # split compound components into component kind and index
732                         # eg: 'addr[0]' -> ('addr', '0')
733                         component, component_index = self._netref_component_split(component)
734
735                         # find object and resolve expression
736                         for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
737                             if component not in self._NETREF_COMPONENT_GETTERS:
738                                 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
739                             elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
740                                 pass
741                             else:
742                                 ref_value = self._NETREF_COMPONENT_GETTERS[component](
743                                     ref_testbed, ref_guid, component_index, attribute)
744                                 if ref_value:
745                                     value = rv = value.replace(match.group(), ref_value)
746                                     break
747                         else:
748                             # unresolvable netref
749                             return failval
750                         break
751             else:
752                 break
753         return rv
754     
755     def do_netrefs(self, data, fail_if_undefined = False):
756         # element netrefs
757         for (testbed_guid, guid), attrs in self._netrefs.items():
758             testbed = self._testbeds.get(testbed_guid)
759             if testbed is not None:
760                 for name in set(attrs):
761                     value = testbed.get(guid, name)
762                     if isinstance(value, basestring):
763                         ref_value = self.resolve_netref_value(value)
764                         if ref_value is not None:
765                             testbed.set(guid, name, ref_value)
766                             attrs.remove(name)
767                         elif fail_if_undefined:
768                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
769                 if not attrs:
770                     del self._netrefs[(testbed_guid, guid)]
771         
772         # testbed netrefs
773         for testbed_guid, attrs in self._testbed_netrefs.items():
774             tb_data = dict(data.get_attribute_data(testbed_guid))
775             if data:
776                 for name in set(attrs):
777                     value = tb_data.get(name)
778                     if isinstance(value, basestring):
779                         ref_value = self.resolve_netref_value(value)
780                         if ref_value is not None:
781                             data.set_attribute_data(testbed_guid, name, ref_value)
782                             attrs.remove(name)
783                         elif fail_if_undefined:
784                             raise ValueError, "Unresolvable netref in: %r" % (value,)
785                 if not attrs:
786                     del self._testbed_netrefs[testbed_guid]
787         
788
789     def _init_testbed_controllers(self, data, recover = False):
790         blacklist_testbeds = set(self._testbeds)
791         element_guids = list()
792         label_guids = dict()
793         data_guids = data.guids
794         to_recover = set()
795         to_restart = set()
796
797         # gather label associations
798         for guid in data_guids:
799             if not data.is_testbed_data(guid):
800                 (testbed_guid, factory_id) = data.get_box_data(guid)
801                 label = data.get_attribute_data(guid, "label")
802                 if label is not None:
803                     if label in label_guids:
804                         raise RuntimeError, "Label %r is not unique" % (label,)
805                     label_guids[label] = guid
806
807         # create testbed controllers
808         for guid in data_guids:
809             if data.is_testbed_data(guid):
810                 if guid not in self._testbeds:
811                     try:
812                         self._create_testbed_controller(
813                             guid, data, element_guids, recover)
814                         if recover:
815                             # Already programmed
816                             blacklist_testbeds.add(guid)
817                         else:
818                             to_restart.add(guid)
819                     except:
820                         if recover:
821                             policy = self._testbed_recovery_policy(guid, data=data)
822                             if policy == DC.POLICY_RECOVER:
823                                 self._create_testbed_controller(
824                                     guid, data, element_guids, False)
825                                 to_recover.add(guid)
826                             elif policy == DC.POLICY_RESTART:
827                                 self._create_testbed_controller(
828                                     guid, data, element_guids, False)
829                                 to_restart.add(guid)
830                             else:
831                                 # Mark failed
832                                 self._failed_testbeds.add(guid)
833                         else:
834                             raise
835         
836         # queue programmable elements
837         #  - that have not been programmed already (blacklist_testbeds)
838         #  - including recovered or restarted testbeds
839         #  - but those that have no unresolved netrefs
840         for guid in data_guids:
841             if not data.is_testbed_data(guid):
842                 (testbed_guid, factory_id) = data.get_box_data(guid)
843                 if testbed_guid not in blacklist_testbeds:
844                     element_guids.append(guid)
845
846         # replace references to elements labels for its guid
847         self._resolve_labels(data, data_guids, label_guids)
848     
849         # program testbed controllers
850         if element_guids:
851             self._program_testbed_controllers(element_guids, data)
852         
853         return to_recover, to_restart
854
855     def _resolve_labels(self, data, data_guids, label_guids):
856         netrefs = self._netrefs
857         testbed_netrefs = self._testbed_netrefs
858         for guid in data_guids:
859             for name, value in data.get_attribute_data(guid):
860                 if isinstance(value, basestring):
861                     while True:
862                         for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
863                             label = match.group("label")
864                             if not label.startswith('GUID-'):
865                                 ref_guid = label_guids.get(label)
866                                 if ref_guid is not None:
867                                     value = value.replace(
868                                         match.group(),
869                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
870                                             guid = 'GUID-%d' % (ref_guid,),
871                                             expr = match.group("expr"),
872                                             label = label)
873                                     )
874                                     data.set_attribute_data(guid, name, value)
875                                     
876                                     # memorize which guid-attribute pairs require
877                                     # postprocessing, to avoid excessive controller-testbed
878                                     # communication at configuration time
879                                     # (which could require high-latency network I/O)
880                                     if not data.is_testbed_data(guid):
881                                         (testbed_guid, factory_id) = data.get_box_data(guid)
882                                         netrefs[(testbed_guid, guid)].add(name)
883                                     else:
884                                         testbed_netrefs[guid].add(name)
885                                     
886                                     break
887                         else:
888                             break
889
890     def _create_testbed_controller(self, guid, data, element_guids, recover):
891         (testbed_id, testbed_version) = data.get_testbed_data(guid)
892         deployment_config = self._deployment_config.get(guid)
893         
894         # deferred import because proxy needs
895         # our class definitions to define proxies
896         import nepi.util.proxy as proxy
897         
898         if deployment_config is None:
899             # need to create one
900             deployment_config = proxy.AccessConfiguration()
901             
902             for (name, value) in data.get_attribute_data(guid):
903                 if value is not None and deployment_config.has_attribute(name):
904                     # if any deployment config attribute has a netref, we can't
905                     # create this controller yet
906                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
907                         # remember to re-issue this one
908                         self._netreffed_testbeds.add(guid)
909                         return
910                     
911                     # copy deployment config attribute
912                     deployment_config.set_attribute_value(name, value)
913             
914             # commit config
915             self._deployment_config[guid] = deployment_config
916         
917         if deployment_config is not None:
918             # force recovery mode 
919             deployment_config.set_attribute_value("recover",recover)
920         
921         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
922                 deployment_config)
923         for (name, value) in data.get_attribute_data(guid):
924             testbed.defer_configure(name, value)
925         self._testbeds[guid] = testbed
926         if guid in self._netreffed_testbeds:
927             self._netreffed_testbeds.remove(guid)
928
929     def _program_testbed_controllers(self, element_guids, data):
930         def resolve_create_netref(data, guid, name, value): 
931             # Try to resolve create-time netrefs, if possible
932             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
933                 try:
934                     nuvalue = self.resolve_netref_value(value)
935                 except:
936                     # Any trouble means we're not in shape to resolve the netref yet
937                     nuvalue = None
938                 if nuvalue is not None:
939                     # Only if we succeed we remove the netref deferral entry
940                     value = nuvalue
941                     data.set_attribute_data(guid, name, value)
942                     if (testbed_guid, guid) in self._netrefs:
943                         self._netrefs[(testbed_guid, guid)].discard(name)
944             return value
945
946         for guid in element_guids:
947             (testbed_guid, factory_id) = data.get_box_data(guid)
948             testbed = self._testbeds.get(testbed_guid)
949             if testbed is not None:
950                 # create
951                 testbed.defer_create(guid, factory_id)
952                 # set attributes
953                 for (name, value) in data.get_attribute_data(guid):
954                     value = resolve_create_netref(data, guid, name, value)
955                     testbed.defer_create_set(guid, name, value)
956
957         for guid in element_guids:
958             (testbed_guid, factory_id) = data.get_box_data(guid)
959             testbed = self._testbeds.get(testbed_guid)
960             if testbed is not None:
961                 # traces
962                 for trace_id in data.get_trace_data(guid):
963                     testbed.defer_add_trace(guid, trace_id)
964                 # addresses
965                 for (address, netprefix, broadcast) in data.get_address_data(guid):
966                     if address != None:
967                         testbed.defer_add_address(guid, address, netprefix, 
968                                 broadcast)
969                 # routes
970                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
971                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
972                 # store connections data
973                 for (connector_type_name, other_guid, other_connector_type_name) \
974                         in data.get_connection_data(guid):
975                     (other_testbed_guid, other_factory_id) = data.get_box_data(
976                             other_guid)
977                     if testbed_guid == other_testbed_guid:
978                         # each testbed should take care of enforcing internal
979                         # connection simmetry, so each connection is only
980                         # added in one direction
981                         testbed.defer_connect(guid, connector_type_name, 
982                                 other_guid, other_connector_type_name)
983
984     def _program_testbed_cross_connections(self, data):
985         data_guids = data.guids
986         for guid in data_guids: 
987             if not data.is_testbed_data(guid):
988                 (testbed_guid, factory_id) = data.get_box_data(guid)
989                 testbed = self._testbeds.get(testbed_guid)
990                 if testbed is not None:
991                     for (connector_type_name, cross_guid, cross_connector_type_name) \
992                             in data.get_connection_data(guid):
993                         (testbed_guid, factory_id) = data.get_box_data(guid)
994                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
995                                 cross_guid)
996                         if testbed_guid != cross_testbed_guid:
997                             cross_testbed = self._testbeds[cross_testbed_guid]
998                             cross_testbed_id = cross_testbed.testbed_id
999                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
1000                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
1001                                     cross_connector_type_name)
1002                             # save cross data for later
1003                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1004                                     cross_guid)
1005
1006     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1007         if testbed_guid not in self._cross_data:
1008             self._cross_data[testbed_guid] = dict()
1009         if cross_testbed_guid not in self._cross_data[testbed_guid]:
1010             self._cross_data[testbed_guid][cross_testbed_guid] = set()
1011         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1012
1013     def _get_cross_data(self, testbed_guid):
1014         cross_data = dict()
1015         if not testbed_guid in self._cross_data:
1016             return cross_data
1017
1018         # fetch attribute lists in one batch
1019         attribute_lists = dict()
1020         for cross_testbed_guid, guid_list in \
1021                 self._cross_data[testbed_guid].iteritems():
1022             cross_testbed = self._testbeds[cross_testbed_guid]
1023             for cross_guid in guid_list:
1024                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1025                     cross_testbed.get_attribute_list_deferred(cross_guid)
1026
1027         # fetch attribute values in another batch
1028         for cross_testbed_guid, guid_list in \
1029                 self._cross_data[testbed_guid].iteritems():
1030             cross_data[cross_testbed_guid] = dict()
1031             cross_testbed = self._testbeds[cross_testbed_guid]
1032             for cross_guid in guid_list:
1033                 elem_cross_data = dict(
1034                     _guid = cross_guid,
1035                     _testbed_guid = cross_testbed_guid,
1036                     _testbed_id = cross_testbed.testbed_id,
1037                     _testbed_version = cross_testbed.testbed_version)
1038                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1039                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1040                 for attr_name in attribute_list:
1041                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1042                     elem_cross_data[attr_name] = attr_value
1043         
1044         # undefer all values - we'll have to serialize them probably later
1045         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1046             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1047                 for attr_name, attr_value in elem_cross_data.iteritems():
1048                     elem_cross_data[attr_name] = _undefer(attr_value)
1049         
1050         return cross_data
1051