Allow multiple occurrences (even recursive occurrences) of netrefs
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15
16 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
17 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
18 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
19
20 def _undefer(deferred):
21     if hasattr(deferred, '_get'):
22         return deferred._get()
23     else:
24         return deferred
25
26
27 class TestbedController(object):
28     def __init__(self, testbed_id, testbed_version):
29         self._testbed_id = testbed_id
30         self._testbed_version = testbed_version
31
32     @property
33     def testbed_id(self):
34         return self._testbed_id
35
36     @property
37     def testbed_version(self):
38         return self._testbed_version
39
40     @property
41     def guids(self):
42         raise NotImplementedError
43
44     def defer_configure(self, name, value):
45         """Instructs setting a configuartion attribute for the testbed instance"""
46         raise NotImplementedError
47
48     def defer_create(self, guid, factory_id):
49         """Instructs creation of element """
50         raise NotImplementedError
51
52     def defer_create_set(self, guid, name, value):
53         """Instructs setting an initial attribute on an element"""
54         raise NotImplementedError
55
56     def defer_factory_set(self, guid, name, value):
57         """Instructs setting an attribute on a factory"""
58         raise NotImplementedError
59
60     def defer_connect(self, guid1, connector_type_name1, guid2, 
61             connector_type_name2): 
62         """Instructs creation of a connection between the given connectors"""
63         raise NotImplementedError
64
65     def defer_cross_connect(self, 
66             guid, connector_type_name,
67             cross_guid, cross_testbed_guid,
68             cross_testbed_id, cross_factory_id,
69             cross_connector_type_name):
70         """
71         Instructs creation of a connection between the given connectors 
72         of different testbed instances
73         """
74         raise NotImplementedError
75
76     def defer_add_trace(self, guid, trace_id):
77         """Instructs the addition of a trace"""
78         raise NotImplementedError
79
80     def defer_add_address(self, guid, address, netprefix, broadcast): 
81         """Instructs the addition of an address"""
82         raise NotImplementedError
83
84     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
85         """Instructs the addition of a route"""
86         raise NotImplementedError
87
88     def do_setup(self):
89         """After do_setup the testbed initial configuration is done"""
90         raise NotImplementedError
91
92     def do_create(self):
93         """
94         After do_create all instructed elements are created and 
95         attributes setted
96         """
97         raise NotImplementedError
98
99     def do_connect_init(self):
100         """
101         After do_connect_init all internal connections between testbed elements
102         are initiated
103         """
104         raise NotImplementedError
105
106     def do_connect_compl(self):
107         """
108         After do_connect all internal connections between testbed elements
109         are completed
110         """
111         raise NotImplementedError
112
113     def do_preconfigure(self):
114         """
115         Done just before resolving netrefs, after connection, before cross connections,
116         useful for early stages of configuration, for setting up stuff that might be
117         required for netref resolution.
118         """
119         raise NotImplementedError
120
121     def do_configure(self):
122         """After do_configure elements are configured"""
123         raise NotImplementedError
124
125     def do_prestart(self):
126         """Before do_start elements are prestart-configured"""
127         raise NotImplementedError
128
129     def do_cross_connect_init(self, cross_data):
130         """
131         After do_cross_connect_init initiation of all external connections 
132         between different testbed elements is performed
133         """
134         raise NotImplementedError
135
136     def do_cross_connect_compl(self, cross_data):
137         """
138         After do_cross_connect_compl completion of all external connections 
139         between different testbed elements is performed
140         """
141         raise NotImplementedError
142
143     def start(self):
144         raise NotImplementedError
145
146     def stop(self):
147         raise NotImplementedError
148
149     def recover(self):
150         """
151         On testbed recovery (if recovery is a supported policy), the controller
152         instance will be re-created and the following sequence invoked:
153         
154             do_setup
155             defer_X - programming the testbed with persisted execution values
156                 (not design values). Execution values (ExecImmutable attributes)
157                 should be enough to recreate the testbed's state.
158             *recover*
159             <cross-connection methods>
160             
161         Start will not be called, and after cross connection invocations,
162         the testbed is supposed to be fully functional again.
163         """
164         raise NotImplementedError
165
166     def set(self, guid, name, value, time = TIME_NOW):
167         raise NotImplementedError
168
169     def get(self, guid, name, time = TIME_NOW):
170         raise NotImplementedError
171     
172     def get_route(self, guid, index, attribute):
173         """
174         Params:
175             
176             guid: guid of box to query
177             index: number of routing entry to fetch
178             attribute: one of Destination, NextHop, NetPrefix
179         """
180         raise NotImplementedError
181
182     def get_address(self, guid, index, attribute='Address'):
183         """
184         Params:
185             
186             guid: guid of box to query
187             index: number of inteface to select
188             attribute: one of Address, NetPrefix, Broadcast
189         """
190         raise NotImplementedError
191
192     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
193         raise NotImplementedError
194
195     def get_factory_id(self, guid):
196         raise NotImplementedError
197
198     def action(self, time, guid, action):
199         raise NotImplementedError
200
201     def status(self, guid):
202         raise NotImplementedError
203     
204     def testbed_status(self):
205         raise NotImplementedError
206
207     def trace(self, guid, trace_id, attribute='value'):
208         raise NotImplementedError
209
210     def traces_info(self):
211         """ dictionary of dictionaries:
212             traces_info = dict({
213                 guid = dict({
214                     trace_id = dict({
215                             host = host,
216                             filepath = filepath,
217                             filesize = size in bytes,
218                         })
219                 })
220             })"""
221         raise NotImplementedError
222
223     def shutdown(self):
224         raise NotImplementedError
225
226 class ExperimentController(object):
227     def __init__(self, experiment_xml, root_dir):
228         self._experiment_design_xml = experiment_xml
229         self._experiment_execute_xml = None
230         self._testbeds = dict()
231         self._deployment_config = dict()
232         self._netrefs = collections.defaultdict(set)
233         self._testbed_netrefs = collections.defaultdict(set)
234         self._cross_data = dict()
235         self._root_dir = root_dir
236         self._netreffed_testbeds = set()
237         self._guids_in_testbed_cache = dict()
238         self._failed_testbeds = set()
239         
240         if experiment_xml is None and root_dir is not None:
241             # Recover
242             self.load_experiment_xml()
243             self.load_execute_xml()
244         else:
245             self.persist_experiment_xml()
246
247     @property
248     def experiment_design_xml(self):
249         return self._experiment_design_xml
250
251     @property
252     def experiment_execute_xml(self):
253         return self._experiment_execute_xml
254
255     @property
256     def guids(self):
257         guids = list()
258         for testbed_guid in self._testbeds.keys():
259             _guids = self._guids_in_testbed(testbed_guid)
260             if _guids:
261                 guids.extend(_guids)
262         return guids
263
264     def persist_experiment_xml(self):
265         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
266         f = open(xml_path, "w")
267         f.write(self._experiment_design_xml)
268         f.close()
269
270     def persist_execute_xml(self):
271         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
272         f = open(xml_path, "w")
273         f.write(self._experiment_execute_xml)
274         f.close()
275
276     def load_experiment_xml(self):
277         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
278         f = open(xml_path, "r")
279         self._experiment_design_xml = f.read()
280         f.close()
281
282     def load_execute_xml(self):
283         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
284         f = open(xml_path, "r")
285         self._experiment_execute_xml = f.read()
286         f.close()
287
288     def trace(self, guid, trace_id, attribute='value'):
289         testbed = self._testbed_for_guid(guid)
290         if testbed != None:
291             return testbed.trace(guid, trace_id, attribute)
292         raise RuntimeError("No element exists with guid %d" % guid)    
293
294     def traces_info(self):
295         traces_info = dict()
296         for guid, testbed in self._testbeds.iteritems():
297             tinfo = testbed.traces_info()
298             if tinfo:
299                 traces_info[guid] = testbed.traces_info()
300         return traces_info
301
302     @staticmethod
303     def _parallel(callables):
304         excs = []
305         def wrap(callable):
306             @functools.wraps(callable)
307             def wrapped(*p, **kw):
308                 try:
309                     callable(*p, **kw)
310                 except:
311                     import traceback
312                     traceback.print_exc(file=sys.stderr)
313                     excs.append(sys.exc_info())
314             return wrapped
315         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
316         for thread in threads:
317             thread.start()
318         for thread in threads:
319             thread.join()
320         for exc in excs:
321             eTyp, eVal, eLoc = exc
322             raise eTyp, eVal, eLoc
323
324     def start(self):
325         self._start()
326
327     def _start(self, recover = False):
328         parser = XmlExperimentParser()
329         
330         if recover:
331             xml = self._experiment_execute_xml
332         else:
333             xml = self._experiment_design_xml
334         data = parser.from_xml_to_data(xml)
335
336         # instantiate testbed controllers
337         to_recover, to_restart = self._init_testbed_controllers(data, recover)
338         all_restart = set(to_restart)
339         
340         if not recover:
341             # persist testbed connection data, for potential recovery
342             self._persist_testbed_proxies()
343         else:
344             # recover recoverable controllers
345             for guid in to_recover:
346                 try:
347                     self._testbeds[guid].do_setup()
348                     self._testbeds[guid].recover()
349                 except:
350                     # Mark failed
351                     self._failed_testbeds.add(guid)
352     
353         def steps_to_configure(self, allowed_guids):
354             # perform setup in parallel for all test beds,
355             # wait for all threads to finish
356             self._parallel([testbed.do_setup 
357                             for guid,testbed in self._testbeds.iteritems()
358                             if guid in allowed_guids])
359        
360             # perform create-connect in parallel, wait
361             # (internal connections only)
362             self._parallel([testbed.do_create
363                             for guid,testbed in self._testbeds.iteritems()
364                             if guid in allowed_guids])
365
366             self._parallel([testbed.do_connect_init
367                             for guid,testbed in self._testbeds.iteritems()
368                             if guid in allowed_guids])
369
370             self._parallel([testbed.do_connect_compl
371                             for guid,testbed in self._testbeds.iteritems()
372                             if guid in allowed_guids])
373
374             self._parallel([testbed.do_preconfigure
375                             for guid,testbed in self._testbeds.iteritems()
376                             if guid in allowed_guids])
377             self._clear_caches()
378
379         steps_to_configure(self, to_restart)
380
381         if self._netreffed_testbeds:
382             # initally resolve netrefs
383             self.do_netrefs(data, fail_if_undefined=False)
384             
385             # rinse and repeat, for netreffed testbeds
386             netreffed_testbeds = set(self._netreffed_testbeds)
387
388             to_recover, to_restart = self._init_testbed_controllers(data, recover)
389             all_restart.update(to_restart)
390             
391             if not recover:
392                 # persist testbed connection data, for potential recovery
393                 self._persist_testbed_proxies()
394             else:
395                 # recover recoverable controllers
396                 for guid in to_recover:
397                     try:
398                         self._testbeds[guid].do_setup()
399                         self._testbeds[guid].recover()
400                     except:
401                         # Mark failed
402                         self._failed_testbeds.add(guid)
403
404             # configure dependant testbeds
405             steps_to_configure(self, to_restart)
406         
407         all_restart = [ self._testbeds[guid] for guid in all_restart ]
408             
409         # final netref step, fail if anything's left unresolved
410         self.do_netrefs(data, fail_if_undefined=True)
411        
412         # Only now, that netref dependencies have been solve, it is safe to
413         # program cross_connections
414         self._program_testbed_cross_connections(data)
415  
416         # perform do_configure in parallel for al testbeds
417         # (it's internal configuration for each)
418         self._parallel([testbed.do_configure
419                         for testbed in all_restart])
420
421         self._clear_caches()
422
423         #print >>sys.stderr, "DO IT"
424         #import time
425         #time.sleep(60)
426         
427         # cross-connect (cannot be done in parallel)
428         for guid, testbed in self._testbeds.iteritems():
429             cross_data = self._get_cross_data(guid)
430             testbed.do_cross_connect_init(cross_data)
431         for guid, testbed in self._testbeds.iteritems():
432             cross_data = self._get_cross_data(guid)
433             testbed.do_cross_connect_compl(cross_data)
434        
435         self._clear_caches()
436
437         # Last chance to configure (parallel on all testbeds)
438         self._parallel([testbed.do_prestart
439                         for testbed in all_restart])
440
441         self._clear_caches()
442         
443         if not recover:
444             # update execution xml with execution-specific values
445             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
446             self._update_execute_xml()
447             self.persist_execute_xml()
448
449         # start experiment (parallel start on all testbeds)
450         self._parallel([testbed.start
451                         for testbed in all_restart])
452
453         self._clear_caches()
454
455     def _clear_caches(self):
456         # Cleaning cache for safety.
457         self._guids_in_testbed_cache = dict()
458
459     def _persist_testbed_proxies(self):
460         TRANSIENT = (DC.RECOVER,)
461         
462         # persist access configuration for all testbeds, so that
463         # recovery mode can reconnect to them if it becomes necessary
464         conf = ConfigParser.RawConfigParser()
465         for testbed_guid, testbed_config in self._deployment_config.iteritems():
466             testbed_guid = str(testbed_guid)
467             conf.add_section(testbed_guid)
468             for attr in testbed_config.get_attribute_list():
469                 if attr not in TRANSIENT:
470                     conf.set(testbed_guid, attr, 
471                         testbed_config.get_attribute_value(attr))
472         
473         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
474         conf.write(f)
475         f.close()
476     
477     def _load_testbed_proxies(self):
478         TYPEMAP = {
479             Attribute.STRING : 'get',
480             Attribute.BOOL : 'getboolean',
481             Attribute.ENUM : 'get',
482             Attribute.DOUBLE : 'getfloat',
483             Attribute.INTEGER : 'getint',
484         }
485         
486         TRANSIENT = (DC.RECOVER,)
487         
488         # deferred import because proxy needs
489         # our class definitions to define proxies
490         import nepi.util.proxy as proxy
491         
492         conf = ConfigParser.RawConfigParser()
493         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
494         for testbed_guid in conf.sections():
495             testbed_config = proxy.AccessConfiguration()
496             testbed_guid = str(testbed_guid)
497             for attr in testbed_config.get_attribute_list():
498                 if attr not in TRANSIENT:
499                     getter = getattr(conf, TYPEMAP.get(
500                         testbed_config.get_attribute_type(attr),
501                         'get') )
502                     testbed_config.set_attribute_value(
503                         attr, getter(testbed_guid, attr))
504     
505     def _unpersist_testbed_proxies(self):
506         try:
507             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
508         except:
509             # Just print exceptions, this is just cleanup
510             import traceback
511             ######## BUG ##########
512             #BUG: If the next line is uncomented pyQt explodes when shutting down the experiment !!!!!!!!
513             #traceback.print_exc(file=sys.stderr)
514
515     def _update_execute_xml(self):
516         # For all testbeds,
517         #   For all elements in testbed,
518         #       - gather immutable execute-readable attribuets lists
519         #         asynchronously
520         # Generate new design description from design xml
521         # (Wait for attributes lists - implicit syncpoint)
522         # For all testbeds,
523         #   For all elements in testbed,
524         #       - gather all immutable execute-readable attribute
525         #         values, asynchronously
526         # (Wait for attribute values - implicit syncpoint)
527         # For all testbeds,
528         #   For all elements in testbed,
529         #       - inject non-None values into new design
530         # Generate execute xml from new design
531
532         attribute_lists = dict(
533             (testbed_guid, collections.defaultdict(dict))
534             for testbed_guid in self._testbeds
535         )
536         
537         for testbed_guid, testbed in self._testbeds.iteritems():
538             guids = self._guids_in_testbed(testbed_guid)
539             for guid in guids:
540                 attribute_lists[testbed_guid][guid] = \
541                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
542         
543         parser = XmlExperimentParser()
544         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
545
546         attribute_values = dict(
547             (testbed_guid, collections.defaultdict(dict))
548             for testbed_guid in self._testbeds
549         )
550         
551         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
552             testbed = self._testbeds[testbed_guid]
553             for guid, attribute_list in testbed_attribute_lists.iteritems():
554                 attribute_list = _undefer(attribute_list)
555                 attribute_values[testbed_guid][guid] = dict(
556                     (attribute, testbed.get_deferred(guid, attribute))
557                     for attribute in attribute_list
558                 )
559         
560         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
561             for guid, attribute_values in testbed_attribute_values.iteritems():
562                 for attribute, value in attribute_values.iteritems():
563                     value = _undefer(value)
564                     if value is not None:
565                         execute_data.add_attribute_data(guid, attribute, value)
566         
567         self._experiment_execute_xml = parser.to_xml(data=execute_data)
568
569     def stop(self):
570        for testbed in self._testbeds.values():
571            testbed.stop()
572        self._unpersist_testbed_proxies()
573    
574     def recover(self):
575         # reload perviously persisted testbed access configurations
576         self._failed_testbeds.clear()
577         self._load_testbed_proxies()
578
579         # re-program testbeds that need recovery
580         self._start(recover = True)
581
582     def is_finished(self, guid):
583         testbed = self._testbed_for_guid(guid)
584         if testbed != None:
585             return testbed.status(guid) == AS.STATUS_FINISHED
586         raise RuntimeError("No element exists with guid %d" % guid)    
587     
588     def _testbed_recovery_policy(self, guid, data = None):
589         if data is None:
590             parser = XmlExperimentParser()
591             data = parser.from_xml_to_data(self._experiment_design_xml)
592         
593         return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
594
595     def status(self, guid):
596         if guid in self._testbeds:
597             # guid is a testbed
598             # report testbed status
599             if guid in self._failed_testbeds:
600                 return TS.STATUS_FAILED
601             else:
602                 try:
603                     return self._testbeds[guid].status()
604                 except:
605                     return TS.STATUS_UNRESPONSIVE
606         else:
607             # guid is an element
608             testbed = self._testbed_for_guid(guid)
609             if testbed is not None:
610                 return testbed.status(guid)
611             else:
612                 return AS.STATUS_UNDETERMINED
613
614     def set(self, guid, name, value, time = TIME_NOW):
615         testbed = self._testbed_for_guid(guid)
616         if testbed != None:
617             testbed.set(guid, name, value, time)
618         else:
619             raise RuntimeError("No element exists with guid %d" % guid)    
620
621     def get(self, guid, name, time = TIME_NOW):
622         testbed = self._testbed_for_guid(guid)
623         if testbed != None:
624             return testbed.get(guid, name, time)
625         raise RuntimeError("No element exists with guid %d" % guid)    
626
627     def get_deferred(self, guid, name, time = TIME_NOW):
628         testbed = self._testbed_for_guid(guid)
629         if testbed != None:
630             return testbed.get_deferred(guid, name, time)
631         raise RuntimeError("No element exists with guid %d" % guid)    
632
633     def get_factory_id(self, guid):
634         testbed = self._testbed_for_guid(guid)
635         if testbed != None:
636             return testbed.get_factory_id(guid)
637         raise RuntimeError("No element exists with guid %d" % guid)    
638
639     def get_testbed_id(self, guid):
640         testbed = self._testbed_for_guid(guid)
641         if testbed != None:
642             return testbed.testbed_id
643         raise RuntimeError("No element exists with guid %d" % guid)    
644
645     def get_testbed_version(self, guid):
646         testbed = self._testbed_for_guid(guid)
647         if testbed != None:
648             return testbed.testbed_version
649         raise RuntimeError("No element exists with guid %d" % guid)    
650
651     def shutdown(self):
652         exceptions = list()
653         for testbed in self._testbeds.values():
654             try:
655                 testbed.shutdown()
656             except:
657                 exceptions.append(sys.exc_info())
658         for exc_info in exceptions:
659             raise exc_info[0], exc_info[1], exc_info[2]
660
661     def _testbed_for_guid(self, guid):
662         for testbed_guid in self._testbeds.keys():
663             if guid in self._guids_in_testbed(testbed_guid):
664                 if testbed_guid in self._failed_testbeds:
665                     return None
666                 return self._testbeds[testbed_guid]
667         return None
668
669     def _guids_in_testbed(self, testbed_guid):
670         if testbed_guid not in self._testbeds:
671             return set()
672         if testbed_guid not in self._guids_in_testbed_cache:
673             self._guids_in_testbed_cache[testbed_guid] = \
674                 set(self._testbeds[testbed_guid].guids)
675         return self._guids_in_testbed_cache[testbed_guid]
676
677     @staticmethod
678     def _netref_component_split(component):
679         match = COMPONENT_PATTERN.match(component)
680         if match:
681             return match.group("kind"), match.group("index")
682         else:
683             return component, None
684
685     _NETREF_COMPONENT_GETTERS = {
686         'addr':
687             lambda testbed, guid, index, name: 
688                 testbed.get_address(guid, int(index), name),
689         'route' :
690             lambda testbed, guid, index, name: 
691                 testbed.get_route(guid, int(index), name),
692         'trace' :
693             lambda testbed, guid, index, name: 
694                 testbed.trace(guid, index, name),
695         '' : 
696             lambda testbed, guid, index, name: 
697                 testbed.get(guid, name),
698     }
699     
700     def resolve_netref_value(self, value, failval = None):
701         rv = failval
702         while True:
703             for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
704                 label = match.group("label")
705                 if label.startswith('GUID-'):
706                     ref_guid = int(label[5:])
707                     if ref_guid:
708                         expr = match.group("expr")
709                         component = (match.group("component") or "")[1:] # skip the dot
710                         attribute = match.group("attribute")
711                         
712                         # split compound components into component kind and index
713                         # eg: 'addr[0]' -> ('addr', '0')
714                         component, component_index = self._netref_component_split(component)
715
716                         # find object and resolve expression
717                         for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
718                             if component not in self._NETREF_COMPONENT_GETTERS:
719                                 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
720                             elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
721                                 pass
722                             else:
723                                 ref_value = self._NETREF_COMPONENT_GETTERS[component](
724                                     ref_testbed, ref_guid, component_index, attribute)
725                                 if ref_value:
726                                     value = rv = value.replace(match.group(), ref_value)
727                                     break
728                         else:
729                             # unresolvable netref
730                             return failval
731                         break
732             else:
733                 break
734         return rv
735     
736     def do_netrefs(self, data, fail_if_undefined = False):
737         # element netrefs
738         for (testbed_guid, guid), attrs in self._netrefs.items():
739             testbed = self._testbeds.get(testbed_guid)
740             if testbed is not None:
741                 for name in set(attrs):
742                     value = testbed.get(guid, name)
743                     if isinstance(value, basestring):
744                         ref_value = self.resolve_netref_value(value)
745                         if ref_value is not None:
746                             testbed.set(guid, name, ref_value)
747                             attrs.remove(name)
748                         elif fail_if_undefined:
749                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
750                 if not attrs:
751                     del self._netrefs[(testbed_guid, guid)]
752         
753         # testbed netrefs
754         for testbed_guid, attrs in self._testbed_netrefs.items():
755             tb_data = dict(data.get_attribute_data(testbed_guid))
756             if data:
757                 for name in set(attrs):
758                     value = tb_data.get(name)
759                     if isinstance(value, basestring):
760                         ref_value = self.resolve_netref_value(value)
761                         if ref_value is not None:
762                             data.set_attribute_data(testbed_guid, name, ref_value)
763                             attrs.remove(name)
764                         elif fail_if_undefined:
765                             raise ValueError, "Unresolvable netref in: %r" % (value,)
766                 if not attrs:
767                     del self._testbed_netrefs[testbed_guid]
768         
769
770     def _init_testbed_controllers(self, data, recover = False):
771         blacklist_testbeds = set(self._testbeds)
772         element_guids = list()
773         label_guids = dict()
774         data_guids = data.guids
775         to_recover = set()
776         to_restart = set()
777
778         # gather label associations
779         for guid in data_guids:
780             if not data.is_testbed_data(guid):
781                 (testbed_guid, factory_id) = data.get_box_data(guid)
782                 label = data.get_attribute_data(guid, "label")
783                 if label is not None:
784                     if label in label_guids:
785                         raise RuntimeError, "Label %r is not unique" % (label,)
786                     label_guids[label] = guid
787
788         # create testbed controllers
789         for guid in data_guids:
790             if data.is_testbed_data(guid):
791                 if guid not in self._testbeds:
792                     try:
793                         self._create_testbed_controller(
794                             guid, data, element_guids, recover)
795                         if recover:
796                             # Already programmed
797                             blacklist_testbeds.add(guid)
798                         else:
799                             to_restart.add(guid)
800                     except:
801                         if recover:
802                             policy = self._testbed_recovery_policy(guid, data=data)
803                             if policy == DC.POLICY_RECOVER:
804                                 self._create_testbed_controller(
805                                     guid, data, element_guids, False)
806                                 to_recover.add(guid)
807                             elif policy == DC.POLICY_RESTART:
808                                 self._create_testbed_controller(
809                                     guid, data, element_guids, False)
810                                 to_restart.add(guid)
811                             else:
812                                 # Mark failed
813                                 self._failed_testbeds.add(guid)
814                         else:
815                             raise
816         
817         # queue programmable elements
818         #  - that have not been programmed already (blacklist_testbeds)
819         #  - including recovered or restarted testbeds
820         #  - but those that have no unresolved netrefs
821         for guid in data_guids:
822             if not data.is_testbed_data(guid):
823                 (testbed_guid, factory_id) = data.get_box_data(guid)
824                 if testbed_guid not in blacklist_testbeds:
825                     element_guids.append(guid)
826
827         # replace references to elements labels for its guid
828         self._resolve_labels(data, data_guids, label_guids)
829     
830         # program testbed controllers
831         if element_guids:
832             self._program_testbed_controllers(element_guids, data)
833         
834         return to_recover, to_restart
835
836     def _resolve_labels(self, data, data_guids, label_guids):
837         netrefs = self._netrefs
838         testbed_netrefs = self._testbed_netrefs
839         for guid in data_guids:
840             for name, value in data.get_attribute_data(guid):
841                 if isinstance(value, basestring):
842                     while True:
843                         for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
844                             label = match.group("label")
845                             if not label.startswith('GUID-'):
846                                 ref_guid = label_guids.get(label)
847                                 if ref_guid is not None:
848                                     value = ATTRIBUTE_PATTERN_BASE.sub(
849                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
850                                             guid = 'GUID-%d' % (ref_guid,),
851                                             expr = match.group("expr"),
852                                             label = label), 
853                                         value)
854                                     data.set_attribute_data(guid, name, value)
855                                     
856                                     # memorize which guid-attribute pairs require
857                                     # postprocessing, to avoid excessive controller-testbed
858                                     # communication at configuration time
859                                     # (which could require high-latency network I/O)
860                                     if not data.is_testbed_data(guid):
861                                         (testbed_guid, factory_id) = data.get_box_data(guid)
862                                         netrefs[(testbed_guid, guid)].add(name)
863                                     else:
864                                         testbed_netrefs[guid].add(name)
865                                     
866                                     break
867                         else:
868                             break
869
870     def _create_testbed_controller(self, guid, data, element_guids, recover):
871         (testbed_id, testbed_version) = data.get_testbed_data(guid)
872         deployment_config = self._deployment_config.get(guid)
873         
874         # deferred import because proxy needs
875         # our class definitions to define proxies
876         import nepi.util.proxy as proxy
877         
878         if deployment_config is None:
879             # need to create one
880             deployment_config = proxy.AccessConfiguration()
881             
882             for (name, value) in data.get_attribute_data(guid):
883                 if value is not None and deployment_config.has_attribute(name):
884                     # if any deployment config attribute has a netref, we can't
885                     # create this controller yet
886                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
887                         # remember to re-issue this one
888                         self._netreffed_testbeds.add(guid)
889                         return
890                     
891                     # copy deployment config attribute
892                     deployment_config.set_attribute_value(name, value)
893             
894             # commit config
895             self._deployment_config[guid] = deployment_config
896         
897         if deployment_config is not None:
898             # force recovery mode 
899             deployment_config.set_attribute_value("recover",recover)
900         
901         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
902                 deployment_config)
903         for (name, value) in data.get_attribute_data(guid):
904             testbed.defer_configure(name, value)
905         self._testbeds[guid] = testbed
906         if guid in self._netreffed_testbeds:
907             self._netreffed_testbeds.remove(guid)
908
909     def _program_testbed_controllers(self, element_guids, data):
910         def resolve_create_netref(data, guid, name, value): 
911             # Try to resolve create-time netrefs, if possible
912             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
913                 try:
914                     nuvalue = self.resolve_netref_value(value)
915                 except:
916                     # Any trouble means we're not in shape to resolve the netref yet
917                     nuvalue = None
918                 if nuvalue is not None:
919                     # Only if we succeed we remove the netref deferral entry
920                     value = nuvalue
921                     data.set_attribute_data(guid, name, value)
922                     if (testbed_guid, guid) in self._netrefs:
923                         self._netrefs[(testbed_guid, guid)].discard(name)
924             return value
925
926         for guid in element_guids:
927             (testbed_guid, factory_id) = data.get_box_data(guid)
928             testbed = self._testbeds.get(testbed_guid)
929             if testbed is not None:
930                 # create
931                 testbed.defer_create(guid, factory_id)
932                 # set attributes
933                 for (name, value) in data.get_attribute_data(guid):
934                     value = resolve_create_netref(data, guid, name, value)
935                     testbed.defer_create_set(guid, name, value)
936
937         for guid in element_guids:
938             (testbed_guid, factory_id) = data.get_box_data(guid)
939             testbed = self._testbeds.get(testbed_guid)
940             if testbed is not None:
941                 # traces
942                 for trace_id in data.get_trace_data(guid):
943                     testbed.defer_add_trace(guid, trace_id)
944                 # addresses
945                 for (address, netprefix, broadcast) in data.get_address_data(guid):
946                     if address != None:
947                         testbed.defer_add_address(guid, address, netprefix, 
948                                 broadcast)
949                 # routes
950                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
951                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
952                 # store connections data
953                 for (connector_type_name, other_guid, other_connector_type_name) \
954                         in data.get_connection_data(guid):
955                     (other_testbed_guid, other_factory_id) = data.get_box_data(
956                             other_guid)
957                     if testbed_guid == other_testbed_guid:
958                         # each testbed should take care of enforcing internal
959                         # connection simmetry, so each connection is only
960                         # added in one direction
961                         testbed.defer_connect(guid, connector_type_name, 
962                                 other_guid, other_connector_type_name)
963
964     def _program_testbed_cross_connections(self, data):
965         data_guids = data.guids
966         for guid in data_guids: 
967             if not data.is_testbed_data(guid):
968                 (testbed_guid, factory_id) = data.get_box_data(guid)
969                 testbed = self._testbeds.get(testbed_guid)
970                 if testbed is not None:
971                     for (connector_type_name, cross_guid, cross_connector_type_name) \
972                             in data.get_connection_data(guid):
973                         (testbed_guid, factory_id) = data.get_box_data(guid)
974                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
975                                 cross_guid)
976                         if testbed_guid != cross_testbed_guid:
977                             cross_testbed = self._testbeds[cross_testbed_guid]
978                             cross_testbed_id = cross_testbed.testbed_id
979                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
980                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
981                                     cross_connector_type_name)
982                             # save cross data for later
983                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
984                                     cross_guid)
985
986     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
987         if testbed_guid not in self._cross_data:
988             self._cross_data[testbed_guid] = dict()
989         if cross_testbed_guid not in self._cross_data[testbed_guid]:
990             self._cross_data[testbed_guid][cross_testbed_guid] = set()
991         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
992
993     def _get_cross_data(self, testbed_guid):
994         cross_data = dict()
995         if not testbed_guid in self._cross_data:
996             return cross_data
997
998         # fetch attribute lists in one batch
999         attribute_lists = dict()
1000         for cross_testbed_guid, guid_list in \
1001                 self._cross_data[testbed_guid].iteritems():
1002             cross_testbed = self._testbeds[cross_testbed_guid]
1003             for cross_guid in guid_list:
1004                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1005                     cross_testbed.get_attribute_list_deferred(cross_guid)
1006
1007         # fetch attribute values in another batch
1008         for cross_testbed_guid, guid_list in \
1009                 self._cross_data[testbed_guid].iteritems():
1010             cross_data[cross_testbed_guid] = dict()
1011             cross_testbed = self._testbeds[cross_testbed_guid]
1012             for cross_guid in guid_list:
1013                 elem_cross_data = dict(
1014                     _guid = cross_guid,
1015                     _testbed_guid = cross_testbed_guid,
1016                     _testbed_id = cross_testbed.testbed_id,
1017                     _testbed_version = cross_testbed.testbed_version)
1018                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1019                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1020                 for attr_name in attribute_list:
1021                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1022                     elem_cross_data[attr_name] = attr_value
1023         
1024         # undefer all values - we'll have to serialize them probably later
1025         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1026             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1027                 for attr_name, attr_value in elem_cross_data.iteritems():
1028                     elem_cross_data[attr_name] = _undefer(attr_value)
1029         
1030         return cross_data
1031