The loglevel of logging is now set from the ExperimentController depending on the...
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15 import time
16 import logging
17 logging.basicConfig()
18
19 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
20 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
21 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
22
23 def _undefer(deferred):
24     if hasattr(deferred, '_get'):
25         return deferred._get()
26     else:
27         return deferred
28
29
30 class TestbedController(object):
31     def __init__(self, testbed_id, testbed_version):
32         self._testbed_id = testbed_id
33         self._testbed_version = testbed_version
34
35     @property
36     def testbed_id(self):
37         return self._testbed_id
38
39     @property
40     def testbed_version(self):
41         return self._testbed_version
42
43     @property
44     def guids(self):
45         raise NotImplementedError
46
47     def defer_configure(self, name, value):
48         """Instructs setting a configuartion attribute for the testbed instance"""
49         raise NotImplementedError
50
51     def defer_create(self, guid, factory_id):
52         """Instructs creation of element """
53         raise NotImplementedError
54
55     def defer_create_set(self, guid, name, value):
56         """Instructs setting an initial attribute on an element"""
57         raise NotImplementedError
58
59     def defer_factory_set(self, guid, name, value):
60         """Instructs setting an attribute on a factory"""
61         raise NotImplementedError
62
63     def defer_connect(self, guid1, connector_type_name1, guid2, 
64             connector_type_name2): 
65         """Instructs creation of a connection between the given connectors"""
66         raise NotImplementedError
67
68     def defer_cross_connect(self, 
69             guid, connector_type_name,
70             cross_guid, cross_testbed_guid,
71             cross_testbed_id, cross_factory_id,
72             cross_connector_type_name):
73         """
74         Instructs creation of a connection between the given connectors 
75         of different testbed instances
76         """
77         raise NotImplementedError
78
79     def defer_add_trace(self, guid, trace_id):
80         """Instructs the addition of a trace"""
81         raise NotImplementedError
82
83     def defer_add_address(self, guid, address, netprefix, broadcast): 
84         """Instructs the addition of an address"""
85         raise NotImplementedError
86
87     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
88         """Instructs the addition of a route"""
89         raise NotImplementedError
90
91     def do_setup(self):
92         """After do_setup the testbed initial configuration is done"""
93         raise NotImplementedError
94
95     def do_create(self):
96         """
97         After do_create all instructed elements are created and 
98         attributes setted
99         """
100         raise NotImplementedError
101
102     def do_connect_init(self):
103         """
104         After do_connect_init all internal connections between testbed elements
105         are initiated
106         """
107         raise NotImplementedError
108
109     def do_connect_compl(self):
110         """
111         After do_connect all internal connections between testbed elements
112         are completed
113         """
114         raise NotImplementedError
115
116     def do_preconfigure(self):
117         """
118         Done just before resolving netrefs, after connection, before cross connections,
119         useful for early stages of configuration, for setting up stuff that might be
120         required for netref resolution.
121         """
122         raise NotImplementedError
123
124     def do_configure(self):
125         """After do_configure elements are configured"""
126         raise NotImplementedError
127
128     def do_prestart(self):
129         """Before do_start elements are prestart-configured"""
130         raise NotImplementedError
131
132     def do_cross_connect_init(self, cross_data):
133         """
134         After do_cross_connect_init initiation of all external connections 
135         between different testbed elements is performed
136         """
137         raise NotImplementedError
138
139     def do_cross_connect_compl(self, cross_data):
140         """
141         After do_cross_connect_compl completion of all external connections 
142         between different testbed elements is performed
143         """
144         raise NotImplementedError
145
146     def start(self):
147         raise NotImplementedError
148
149     def stop(self):
150         raise NotImplementedError
151
152     def recover(self):
153         """
154         On testbed recovery (if recovery is a supported policy), the controller
155         instance will be re-created and the following sequence invoked:
156         
157             do_setup
158             defer_X - programming the testbed with persisted execution values
159                 (not design values). Execution values (ExecImmutable attributes)
160                 should be enough to recreate the testbed's state.
161             *recover*
162             <cross-connection methods>
163             
164         Start will not be called, and after cross connection invocations,
165         the testbed is supposed to be fully functional again.
166         """
167         raise NotImplementedError
168
169     def set(self, guid, name, value, time = TIME_NOW):
170         raise NotImplementedError
171
172     def get(self, guid, name, time = TIME_NOW):
173         raise NotImplementedError
174     
175     def get_route(self, guid, index, attribute):
176         """
177         Params:
178             
179             guid: guid of box to query
180             index: number of routing entry to fetch
181             attribute: one of Destination, NextHop, NetPrefix
182         """
183         raise NotImplementedError
184
185     def get_address(self, guid, index, attribute='Address'):
186         """
187         Params:
188             
189             guid: guid of box to query
190             index: number of inteface to select
191             attribute: one of Address, NetPrefix, Broadcast
192         """
193         raise NotImplementedError
194
195     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
196         raise NotImplementedError
197
198     def get_factory_id(self, guid):
199         raise NotImplementedError
200
201     def action(self, time, guid, action):
202         raise NotImplementedError
203
204     def status(self, guid):
205         raise NotImplementedError
206     
207     def testbed_status(self):
208         raise NotImplementedError
209
210     def trace(self, guid, trace_id, attribute='value'):
211         raise NotImplementedError
212
213     def traces_info(self):
214         """ dictionary of dictionaries:
215             traces_info = dict({
216                 guid = dict({
217                     trace_id = dict({
218                             host = host,
219                             filepath = filepath,
220                             filesize = size in bytes,
221                         })
222                 })
223             })"""
224         raise NotImplementedError
225
226     def shutdown(self):
227         raise NotImplementedError
228
229 class ExperimentController(object):
230     def __init__(self, experiment_xml, root_dir):
231         self._experiment_design_xml = experiment_xml
232         self._experiment_execute_xml = None
233         self._testbeds = dict()
234         self._deployment_config = dict()
235         self._netrefs = collections.defaultdict(set)
236         self._testbed_netrefs = collections.defaultdict(set)
237         self._cross_data = dict()
238         self._root_dir = root_dir
239         self._netreffed_testbeds = set()
240         self._guids_in_testbed_cache = dict()
241         self._failed_testbeds = set()
242         self._started_time = None
243         self._stopped_time = None
244       
245         self._logger = logging.getLogger('nepi.core.execute')
246         level = logging.ERROR
247         if os.environ.get("NEPI_CONTROLLER_LOGLEVEL", 
248                 DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
249             level = logging.DEBUG
250         self._logger.setLevel(level)
251  
252         if experiment_xml is None and root_dir is not None:
253             # Recover
254             self.load_experiment_xml()
255             self.load_execute_xml()
256         else:
257             self.persist_experiment_xml()
258
259     @property
260     def experiment_design_xml(self):
261         return self._experiment_design_xml
262
263     @property
264     def experiment_execute_xml(self):
265         return self._experiment_execute_xml
266
267     @property
268     def started_time(self):
269         return self._started_time
270
271     @property
272     def stopped_time(self):
273         return self._stopped_time
274
275     @property
276     def guids(self):
277         guids = list()
278         for testbed_guid in self._testbeds.keys():
279             _guids = self._guids_in_testbed(testbed_guid)
280             if _guids:
281                 guids.extend(_guids)
282         return guids
283
284     def persist_experiment_xml(self):
285         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
286         f = open(xml_path, "w")
287         f.write(self._experiment_design_xml)
288         f.close()
289
290     def persist_execute_xml(self):
291         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
292         f = open(xml_path, "w")
293         f.write(self._experiment_execute_xml)
294         f.close()
295
296     def load_experiment_xml(self):
297         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
298         f = open(xml_path, "r")
299         self._experiment_design_xml = f.read()
300         f.close()
301
302     def load_execute_xml(self):
303         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
304         f = open(xml_path, "r")
305         self._experiment_execute_xml = f.read()
306         f.close()
307
308     def trace(self, guid, trace_id, attribute='value'):
309         testbed = self._testbed_for_guid(guid)
310         if testbed != None:
311             return testbed.trace(guid, trace_id, attribute)
312         raise RuntimeError("No element exists with guid %d" % guid)    
313
314     def traces_info(self):
315         traces_info = dict()
316         for guid, testbed in self._testbeds.iteritems():
317             tinfo = testbed.traces_info()
318             if tinfo:
319                 traces_info[guid] = testbed.traces_info()
320         return traces_info
321
322     @staticmethod
323     def _parallel(callables):
324         excs = []
325         def wrap(callable):
326             @functools.wraps(callable)
327             def wrapped(*p, **kw):
328                 try:
329                     callable(*p, **kw)
330                 except:
331                     logging.exception("Exception occurred in asynchronous thread:")
332                     excs.append(sys.exc_info())
333             return wrapped
334         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
335         for thread in threads:
336             thread.start()
337         for thread in threads:
338             thread.join()
339         for exc in excs:
340             eTyp, eVal, eLoc = exc
341             raise eTyp, eVal, eLoc
342
343     def start(self):
344         self._started_time = time.time() 
345         self._start()
346
347     def _start(self, recover = False):
348         parser = XmlExperimentParser()
349         
350         if recover:
351             xml = self._experiment_execute_xml
352         else:
353             xml = self._experiment_design_xml
354         data = parser.from_xml_to_data(xml)
355
356         # instantiate testbed controllers
357         to_recover, to_restart = self._init_testbed_controllers(data, recover)
358         all_restart = set(to_restart)
359         
360         if not recover:
361             # persist testbed connection data, for potential recovery
362             self._persist_testbed_proxies()
363         else:
364             # recover recoverable controllers
365             for guid in to_recover:
366                 try:
367                     self._testbeds[guid].do_setup()
368                     self._testbeds[guid].recover()
369                 except:
370                     self._logger.exception("During recovery of testbed %s", guid)
371                     
372                     # Mark failed
373                     self._failed_testbeds.add(guid)
374     
375         def steps_to_configure(self, allowed_guids):
376             # perform setup in parallel for all test beds,
377             # wait for all threads to finish
378
379             self._logger.debug("ExperimentController: Starting parallel do_setup")
380             self._parallel([testbed.do_setup 
381                             for guid,testbed in self._testbeds.iteritems()
382                             if guid in allowed_guids])
383        
384             # perform create-connect in parallel, wait
385             # (internal connections only)
386             self._logger.debug("ExperimentController: Starting parallel do_create")
387             self._parallel([testbed.do_create
388                             for guid,testbed in self._testbeds.iteritems()
389                             if guid in allowed_guids])
390
391             self._logger.debug("ExperimentController: Starting parallel do_connect_init")
392             self._parallel([testbed.do_connect_init
393                             for guid,testbed in self._testbeds.iteritems()
394                             if guid in allowed_guids])
395
396             self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
397             self._parallel([testbed.do_connect_compl
398                             for guid,testbed in self._testbeds.iteritems()
399                             if guid in allowed_guids])
400
401             self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
402             self._parallel([testbed.do_preconfigure
403                             for guid,testbed in self._testbeds.iteritems()
404                             if guid in allowed_guids])
405             self._clear_caches()
406
407         steps_to_configure(self, to_restart)
408
409         self._logger.debug("ExperimentController: Resolving netreffed testbeds")
410         if self._netreffed_testbeds:
411             # initally resolve netrefs
412             self.do_netrefs(data, fail_if_undefined=False)
413             
414             # rinse and repeat, for netreffed testbeds
415             netreffed_testbeds = set(self._netreffed_testbeds)
416
417             to_recover, to_restart = self._init_testbed_controllers(data, recover)
418             all_restart.update(to_restart)
419             
420             if not recover:
421                 # persist testbed connection data, for potential recovery
422                 self._persist_testbed_proxies()
423             else:
424                 # recover recoverable controllers
425                 for guid in to_recover:
426                     try:
427                         self._testbeds[guid].do_setup()
428                         self._testbeds[guid].recover()
429                     except:
430                         self._logger.exception("During recovery of testbed %s", guid)
431
432                         # Mark failed
433                         self._failed_testbeds.add(guid)
434
435             # configure dependant testbeds
436             steps_to_configure(self, to_restart)
437         
438         all_restart = [ self._testbeds[guid] for guid in all_restart ]
439             
440         # final netref step, fail if anything's left unresolved
441         self._logger.debug("ExperimentController: Resolving do_netrefs")
442         self.do_netrefs(data, fail_if_undefined=False)
443        
444         # Only now, that netref dependencies have been solve, it is safe to
445         # program cross_connections
446         self._program_testbed_cross_connections(data)
447  
448         # perform do_configure in parallel for al testbeds
449         # (it's internal configuration for each)
450         self._logger.debug("ExperimentController: Starting parallel do_configure")
451         self._parallel([testbed.do_configure
452                         for testbed in all_restart])
453
454         self._clear_caches()
455
456         #print >>sys.stderr, "DO IT"
457         #import time
458         #time.sleep(60)
459         
460         # cross-connect (cannot be done in parallel)
461         self._logger.debug("ExperimentController: Starting cross-connect")
462         for guid, testbed in self._testbeds.iteritems():
463             cross_data = self._get_cross_data(guid)
464             testbed.do_cross_connect_init(cross_data)
465         for guid, testbed in self._testbeds.iteritems():
466             cross_data = self._get_cross_data(guid)
467             testbed.do_cross_connect_compl(cross_data)
468        
469         self._clear_caches()
470
471         # Last chance to configure (parallel on all testbeds)
472         self._logger.debug("ExperimentController: Starting parallel do_prestart")
473         self._parallel([testbed.do_prestart
474                         for testbed in all_restart])
475
476         # final netref step, fail if anything's left unresolved
477         self.do_netrefs(data, fail_if_undefined=True)
478  
479         self._clear_caches()
480         
481         if not recover:
482             # update execution xml with execution-specific values
483             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
484             self._update_execute_xml()
485             self.persist_execute_xml()
486
487         # start experiment (parallel start on all testbeds)
488         self._logger.debug("ExperimentController: Starting parallel do_start")
489         self._parallel([testbed.start
490                         for testbed in all_restart])
491
492         self._clear_caches()
493
494     def _clear_caches(self):
495         # Cleaning cache for safety.
496         self._guids_in_testbed_cache = dict()
497
498     def _persist_testbed_proxies(self):
499         TRANSIENT = (DC.RECOVER,)
500         
501         # persist access configuration for all testbeds, so that
502         # recovery mode can reconnect to them if it becomes necessary
503         conf = ConfigParser.RawConfigParser()
504         for testbed_guid, testbed_config in self._deployment_config.iteritems():
505             testbed_guid = str(testbed_guid)
506             conf.add_section(testbed_guid)
507             for attr in testbed_config.get_attribute_list():
508                 if attr not in TRANSIENT:
509                     conf.set(testbed_guid, attr, 
510                         testbed_config.get_attribute_value(attr))
511         
512         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
513         conf.write(f)
514         f.close()
515     
516     def _load_testbed_proxies(self):
517         TYPEMAP = {
518             Attribute.STRING : 'get',
519             Attribute.BOOL : 'getboolean',
520             Attribute.ENUM : 'get',
521             Attribute.DOUBLE : 'getfloat',
522             Attribute.INTEGER : 'getint',
523         }
524         
525         TRANSIENT = (DC.RECOVER,)
526         
527         # deferred import because proxy needs
528         # our class definitions to define proxies
529         import nepi.util.proxy as proxy
530         
531         conf = ConfigParser.RawConfigParser()
532         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
533         for testbed_guid in conf.sections():
534             testbed_config = proxy.AccessConfiguration()
535             testbed_guid = str(testbed_guid)
536             for attr in testbed_config.get_attribute_list():
537                 if attr not in TRANSIENT:
538                     getter = getattr(conf, TYPEMAP.get(
539                         testbed_config.get_attribute_type(attr),
540                         'get') )
541                     testbed_config.set_attribute_value(
542                         attr, getter(testbed_guid, attr))
543     
544     def _unpersist_testbed_proxies(self):
545         try:
546             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
547         except:
548             # Just print exceptions, this is just cleanup
549             self._logger.exception("Loading testbed configuration")
550
551     def _update_execute_xml(self):
552         # For all testbeds,
553         #   For all elements in testbed,
554         #       - gather immutable execute-readable attribuets lists
555         #         asynchronously
556         # Generate new design description from design xml
557         # (Wait for attributes lists - implicit syncpoint)
558         # For all testbeds,
559         #   For all elements in testbed,
560         #       - gather all immutable execute-readable attribute
561         #         values, asynchronously
562         # (Wait for attribute values - implicit syncpoint)
563         # For all testbeds,
564         #   For all elements in testbed,
565         #       - inject non-None values into new design
566         # Generate execute xml from new design
567
568         attribute_lists = dict(
569             (testbed_guid, collections.defaultdict(dict))
570             for testbed_guid in self._testbeds
571         )
572         
573         for testbed_guid, testbed in self._testbeds.iteritems():
574             guids = self._guids_in_testbed(testbed_guid)
575             for guid in guids:
576                 attribute_lists[testbed_guid][guid] = \
577                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
578         
579         parser = XmlExperimentParser()
580         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
581
582         attribute_values = dict(
583             (testbed_guid, collections.defaultdict(dict))
584             for testbed_guid in self._testbeds
585         )
586         
587         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
588             testbed = self._testbeds[testbed_guid]
589             for guid, attribute_list in testbed_attribute_lists.iteritems():
590                 attribute_list = _undefer(attribute_list)
591                 attribute_values[testbed_guid][guid] = dict(
592                     (attribute, testbed.get_deferred(guid, attribute))
593                     for attribute in attribute_list
594                 )
595         
596         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
597             for guid, attribute_values in testbed_attribute_values.iteritems():
598                 for attribute, value in attribute_values.iteritems():
599                     value = _undefer(value)
600                     if value is not None:
601                         execute_data.add_attribute_data(guid, attribute, value)
602         
603         self._experiment_execute_xml = parser.to_xml(data=execute_data)
604
605     def stop(self):
606        for testbed in self._testbeds.values():
607            testbed.stop()
608        self._unpersist_testbed_proxies()
609        self._stopped_time = time.time() 
610    
611     def recover(self):
612         # reload perviously persisted testbed access configurations
613         self._failed_testbeds.clear()
614         self._load_testbed_proxies()
615
616         # re-program testbeds that need recovery
617         self._start(recover = True)
618
619     def is_finished(self, guid):
620         testbed = self._testbed_for_guid(guid)
621         if testbed != None:
622             return testbed.status(guid) == AS.STATUS_FINISHED
623         raise RuntimeError("No element exists with guid %d" % guid)    
624     
625     def _testbed_recovery_policy(self, guid, data = None):
626         if data is None:
627             parser = XmlExperimentParser()
628             data = parser.from_xml_to_data(self._experiment_design_xml)
629         
630         return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
631
632     def status(self, guid):
633         if guid in self._testbeds:
634             # guid is a testbed
635             # report testbed status
636             if guid in self._failed_testbeds:
637                 return TS.STATUS_FAILED
638             else:
639                 try:
640                     return self._testbeds[guid].status()
641                 except:
642                     return TS.STATUS_UNRESPONSIVE
643         else:
644             # guid is an element
645             testbed = self._testbed_for_guid(guid)
646             if testbed is not None:
647                 return testbed.status(guid)
648             else:
649                 return AS.STATUS_UNDETERMINED
650
651     def set(self, guid, name, value, time = TIME_NOW):
652         testbed = self._testbed_for_guid(guid)
653         if testbed != None:
654             testbed.set(guid, name, value, time)
655         else:
656             raise RuntimeError("No element exists with guid %d" % guid)    
657
658     def get(self, guid, name, time = TIME_NOW):
659         testbed = self._testbed_for_guid(guid)
660         if testbed != None:
661             return testbed.get(guid, name, time)
662         raise RuntimeError("No element exists with guid %d" % guid)    
663
664     def get_deferred(self, guid, name, time = TIME_NOW):
665         testbed = self._testbed_for_guid(guid)
666         if testbed != None:
667             return testbed.get_deferred(guid, name, time)
668         raise RuntimeError("No element exists with guid %d" % guid)    
669
670     def get_factory_id(self, guid):
671         testbed = self._testbed_for_guid(guid)
672         if testbed != None:
673             return testbed.get_factory_id(guid)
674         raise RuntimeError("No element exists with guid %d" % guid)    
675
676     def get_testbed_id(self, guid):
677         testbed = self._testbed_for_guid(guid)
678         if testbed != None:
679             return testbed.testbed_id
680         raise RuntimeError("No element exists with guid %d" % guid)    
681
682     def get_testbed_version(self, guid):
683         testbed = self._testbed_for_guid(guid)
684         if testbed != None:
685             return testbed.testbed_version
686         raise RuntimeError("No element exists with guid %d" % guid)    
687
688     def shutdown(self):
689         exceptions = list()
690         for testbed in self._testbeds.values():
691             try:
692                 testbed.shutdown()
693             except:
694                 exceptions.append(sys.exc_info())
695         for exc_info in exceptions:
696             raise exc_info[0], exc_info[1], exc_info[2]
697
698     def _testbed_for_guid(self, guid):
699         for testbed_guid in self._testbeds.keys():
700             if guid in self._guids_in_testbed(testbed_guid):
701                 if testbed_guid in self._failed_testbeds:
702                     return None
703                 return self._testbeds[testbed_guid]
704         return None
705
706     def _guids_in_testbed(self, testbed_guid):
707         if testbed_guid not in self._testbeds:
708             return set()
709         if testbed_guid not in self._guids_in_testbed_cache:
710             self._guids_in_testbed_cache[testbed_guid] = \
711                 set(self._testbeds[testbed_guid].guids)
712         return self._guids_in_testbed_cache[testbed_guid]
713
714     @staticmethod
715     def _netref_component_split(component):
716         match = COMPONENT_PATTERN.match(component)
717         if match:
718             return match.group("kind"), match.group("index")
719         else:
720             return component, None
721
722     _NETREF_COMPONENT_GETTERS = {
723         'addr':
724             lambda testbed, guid, index, name: 
725                 testbed.get_address(guid, int(index), name),
726         'route' :
727             lambda testbed, guid, index, name: 
728                 testbed.get_route(guid, int(index), name),
729         'trace' :
730             lambda testbed, guid, index, name: 
731                 testbed.trace(guid, index, attribute = name),
732         '' : 
733             lambda testbed, guid, index, name: 
734                 testbed.get(guid, name),
735     }
736     
737     def resolve_netref_value(self, value, failval = None):
738         rv = failval
739         while True:
740             for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
741                 label = match.group("label")
742                 if label.startswith('GUID-'):
743                     ref_guid = int(label[5:])
744                     if ref_guid:
745                         expr = match.group("expr")
746                         component = (match.group("component") or "")[1:] # skip the dot
747                         attribute = match.group("attribute")
748                         
749                         # split compound components into component kind and index
750                         # eg: 'addr[0]' -> ('addr', '0')
751                         component, component_index = self._netref_component_split(component)
752
753                         # find object and resolve expression
754                         for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
755                             if component not in self._NETREF_COMPONENT_GETTERS:
756                                 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
757                             elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
758                                 pass
759                             else:
760                                 ref_value = self._NETREF_COMPONENT_GETTERS[component](
761                                     ref_testbed, ref_guid, component_index, attribute)
762                                 if ref_value:
763                                     value = rv = value.replace(match.group(), ref_value)
764                                     break
765                         else:
766                             # unresolvable netref
767                             return failval
768                         break
769             else:
770                 break
771         return rv
772     
773     def do_netrefs(self, data, fail_if_undefined = False):
774         # element netrefs
775         for (testbed_guid, guid), attrs in self._netrefs.items():
776             testbed = self._testbeds.get(testbed_guid)
777             if testbed is not None:
778                 for name in set(attrs):
779                     value = testbed.get(guid, name)
780                     if isinstance(value, basestring):
781                         ref_value = self.resolve_netref_value(value)
782                         if ref_value is not None:
783                             testbed.set(guid, name, ref_value)
784                             attrs.remove(name)
785                         elif fail_if_undefined:
786                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
787                 if not attrs:
788                     del self._netrefs[(testbed_guid, guid)]
789         
790         # testbed netrefs
791         for testbed_guid, attrs in self._testbed_netrefs.items():
792             tb_data = dict(data.get_attribute_data(testbed_guid))
793             if data:
794                 for name in set(attrs):
795                     value = tb_data.get(name)
796                     if isinstance(value, basestring):
797                         ref_value = self.resolve_netref_value(value)
798                         if ref_value is not None:
799                             data.set_attribute_data(testbed_guid, name, ref_value)
800                             attrs.remove(name)
801                         elif fail_if_undefined:
802                             raise ValueError, "Unresolvable netref in: %r" % (value,)
803                 if not attrs:
804                     del self._testbed_netrefs[testbed_guid]
805         
806
807     def _init_testbed_controllers(self, data, recover = False):
808         blacklist_testbeds = set(self._testbeds)
809         element_guids = list()
810         label_guids = dict()
811         data_guids = data.guids
812         to_recover = set()
813         to_restart = set()
814
815         # gather label associations
816         for guid in data_guids:
817             if not data.is_testbed_data(guid):
818                 (testbed_guid, factory_id) = data.get_box_data(guid)
819                 label = data.get_attribute_data(guid, "label")
820                 if label is not None:
821                     if label in label_guids:
822                         raise RuntimeError, "Label %r is not unique" % (label,)
823                     label_guids[label] = guid
824
825         # create testbed controllers
826         for guid in data_guids:
827             if data.is_testbed_data(guid):
828                 if guid not in self._testbeds:
829                     try:
830                         self._create_testbed_controller(
831                             guid, data, element_guids, recover)
832                         if recover:
833                             # Already programmed
834                             blacklist_testbeds.add(guid)
835                         else:
836                             to_restart.add(guid)
837                     except:
838                         if recover:
839                             policy = self._testbed_recovery_policy(guid, data=data)
840                             if policy == DC.POLICY_RECOVER:
841                                 self._create_testbed_controller(
842                                     guid, data, element_guids, False)
843                                 to_recover.add(guid)
844                             elif policy == DC.POLICY_RESTART:
845                                 self._create_testbed_controller(
846                                     guid, data, element_guids, False)
847                                 to_restart.add(guid)
848                             else:
849                                 # Mark failed
850                                 self._failed_testbeds.add(guid)
851                         else:
852                             raise
853         
854         # queue programmable elements
855         #  - that have not been programmed already (blacklist_testbeds)
856         #  - including recovered or restarted testbeds
857         #  - but those that have no unresolved netrefs
858         for guid in data_guids:
859             if not data.is_testbed_data(guid):
860                 (testbed_guid, factory_id) = data.get_box_data(guid)
861                 if testbed_guid not in blacklist_testbeds:
862                     element_guids.append(guid)
863
864         # replace references to elements labels for its guid
865         self._resolve_labels(data, data_guids, label_guids)
866     
867         # program testbed controllers
868         if element_guids:
869             self._program_testbed_controllers(element_guids, data)
870         
871         return to_recover, to_restart
872
873     def _resolve_labels(self, data, data_guids, label_guids):
874         netrefs = self._netrefs
875         testbed_netrefs = self._testbed_netrefs
876         for guid in data_guids:
877             for name, value in data.get_attribute_data(guid):
878                 if isinstance(value, basestring):
879                     while True:
880                         for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
881                             label = match.group("label")
882                             if not label.startswith('GUID-'):
883                                 ref_guid = label_guids.get(label)
884                                 if ref_guid is not None:
885                                     value = value.replace(
886                                         match.group(),
887                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
888                                             guid = 'GUID-%d' % (ref_guid,),
889                                             expr = match.group("expr"),
890                                             label = label)
891                                     )
892                                     data.set_attribute_data(guid, name, value)
893                                     
894                                     # memorize which guid-attribute pairs require
895                                     # postprocessing, to avoid excessive controller-testbed
896                                     # communication at configuration time
897                                     # (which could require high-latency network I/O)
898                                     if not data.is_testbed_data(guid):
899                                         (testbed_guid, factory_id) = data.get_box_data(guid)
900                                         netrefs[(testbed_guid, guid)].add(name)
901                                     else:
902                                         testbed_netrefs[guid].add(name)
903                                     
904                                     break
905                         else:
906                             break
907
908     def _create_testbed_controller(self, guid, data, element_guids, recover):
909         (testbed_id, testbed_version) = data.get_testbed_data(guid)
910         deployment_config = self._deployment_config.get(guid)
911         
912         # deferred import because proxy needs
913         # our class definitions to define proxies
914         import nepi.util.proxy as proxy
915         
916         if deployment_config is None:
917             # need to create one
918             deployment_config = proxy.AccessConfiguration()
919             
920             for (name, value) in data.get_attribute_data(guid):
921                 if value is not None and deployment_config.has_attribute(name):
922                     # if any deployment config attribute has a netref, we can't
923                     # create this controller yet
924                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
925                         # remember to re-issue this one
926                         self._netreffed_testbeds.add(guid)
927                         return
928                     
929                     # copy deployment config attribute
930                     deployment_config.set_attribute_value(name, value)
931             
932             # commit config
933             self._deployment_config[guid] = deployment_config
934         
935         if deployment_config is not None:
936             # force recovery mode 
937             deployment_config.set_attribute_value("recover",recover)
938         
939         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
940                 deployment_config)
941         for (name, value) in data.get_attribute_data(guid):
942             testbed.defer_configure(name, value)
943         self._testbeds[guid] = testbed
944         if guid in self._netreffed_testbeds:
945             self._netreffed_testbeds.remove(guid)
946
947     def _program_testbed_controllers(self, element_guids, data):
948         def resolve_create_netref(data, guid, name, value): 
949             # Try to resolve create-time netrefs, if possible
950             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
951                 try:
952                     nuvalue = self.resolve_netref_value(value)
953                 except:
954                     # Any trouble means we're not in shape to resolve the netref yet
955                     nuvalue = None
956                 if nuvalue is not None:
957                     # Only if we succeed we remove the netref deferral entry
958                     value = nuvalue
959                     data.set_attribute_data(guid, name, value)
960                     if (testbed_guid, guid) in self._netrefs:
961                         self._netrefs[(testbed_guid, guid)].discard(name)
962             return value
963
964         for guid in element_guids:
965             (testbed_guid, factory_id) = data.get_box_data(guid)
966             testbed = self._testbeds.get(testbed_guid)
967             if testbed is not None:
968                 # create
969                 testbed.defer_create(guid, factory_id)
970                 # set attributes
971                 for (name, value) in data.get_attribute_data(guid):
972                     value = resolve_create_netref(data, guid, name, value)
973                     testbed.defer_create_set(guid, name, value)
974
975         for guid in element_guids:
976             (testbed_guid, factory_id) = data.get_box_data(guid)
977             testbed = self._testbeds.get(testbed_guid)
978             if testbed is not None:
979                 # traces
980                 for trace_id in data.get_trace_data(guid):
981                     testbed.defer_add_trace(guid, trace_id)
982                 # addresses
983                 for (address, netprefix, broadcast) in data.get_address_data(guid):
984                     if address != None:
985                         testbed.defer_add_address(guid, address, netprefix, 
986                                 broadcast)
987                 # routes
988                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
989                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
990                 # store connections data
991                 for (connector_type_name, other_guid, other_connector_type_name) \
992                         in data.get_connection_data(guid):
993                     (other_testbed_guid, other_factory_id) = data.get_box_data(
994                             other_guid)
995                     if testbed_guid == other_testbed_guid:
996                         # each testbed should take care of enforcing internal
997                         # connection simmetry, so each connection is only
998                         # added in one direction
999                         testbed.defer_connect(guid, connector_type_name, 
1000                                 other_guid, other_connector_type_name)
1001
1002     def _program_testbed_cross_connections(self, data):
1003         data_guids = data.guids
1004         for guid in data_guids: 
1005             if not data.is_testbed_data(guid):
1006                 (testbed_guid, factory_id) = data.get_box_data(guid)
1007                 testbed = self._testbeds.get(testbed_guid)
1008                 if testbed is not None:
1009                     for (connector_type_name, cross_guid, cross_connector_type_name) \
1010                             in data.get_connection_data(guid):
1011                         (testbed_guid, factory_id) = data.get_box_data(guid)
1012                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
1013                                 cross_guid)
1014                         if testbed_guid != cross_testbed_guid:
1015                             cross_testbed = self._testbeds[cross_testbed_guid]
1016                             cross_testbed_id = cross_testbed.testbed_id
1017                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
1018                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
1019                                     cross_connector_type_name)
1020                             # save cross data for later
1021                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1022                                     cross_guid)
1023
1024     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1025         if testbed_guid not in self._cross_data:
1026             self._cross_data[testbed_guid] = dict()
1027         if cross_testbed_guid not in self._cross_data[testbed_guid]:
1028             self._cross_data[testbed_guid][cross_testbed_guid] = set()
1029         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1030
1031     def _get_cross_data(self, testbed_guid):
1032         cross_data = dict()
1033         if not testbed_guid in self._cross_data:
1034             return cross_data
1035
1036         # fetch attribute lists in one batch
1037         attribute_lists = dict()
1038         for cross_testbed_guid, guid_list in \
1039                 self._cross_data[testbed_guid].iteritems():
1040             cross_testbed = self._testbeds[cross_testbed_guid]
1041             for cross_guid in guid_list:
1042                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1043                     cross_testbed.get_attribute_list_deferred(cross_guid)
1044
1045         # fetch attribute values in another batch
1046         for cross_testbed_guid, guid_list in \
1047                 self._cross_data[testbed_guid].iteritems():
1048             cross_data[cross_testbed_guid] = dict()
1049             cross_testbed = self._testbeds[cross_testbed_guid]
1050             for cross_guid in guid_list:
1051                 elem_cross_data = dict(
1052                     _guid = cross_guid,
1053                     _testbed_guid = cross_testbed_guid,
1054                     _testbed_id = cross_testbed.testbed_id,
1055                     _testbed_version = cross_testbed.testbed_version)
1056                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1057                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1058                 for attr_name in attribute_list:
1059                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1060                     elem_cross_data[attr_name] = attr_value
1061         
1062         # undefer all values - we'll have to serialize them probably later
1063         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1064             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1065                 for attr_name, attr_value in elem_cross_data.iteritems():
1066                     elem_cross_data[attr_name] = _undefer(attr_value)
1067         
1068         return cross_data
1069     """
1070 class ExperimentSuite(object):
1071     def __init__(self, experiment_xml, access_config, repetitions,
1072             duration, wait_guids):
1073         self._experiment_xml = experiment_xml
1074         self._access_config = access_config
1075         self._experiments = dict()
1076         self._repetitions = repetitions
1077         self._duration = duration
1078         self._wait_guids = wait_guids
1079         self._current = None
1080         self._status = TS.STATUS_ZERO
1081         self._thread = None
1082
1083     def start(self):
1084         self._status  = TS.STATUS_STARTED
1085         self._thread = threading.Thread(target = self._run_experiment_suite)
1086         self._thread.start()
1087
1088     def shutdown(self):
1089         if self._thread:
1090             self._thread.join()
1091             self._thread = None
1092
1093     def _run_experiment_suite(self):
1094         for i in xrange[0, self.repetitions]:
1095             self._current = i
1096             self._run_one_experiment()
1097
1098     def _run_one_experiment(self):
1099         access_config = proxy.AccessConfiguration()
1100         for attr in self._access_config.attributes:
1101             access_config.set_attribute_value(attr.name, attr.value)
1102         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1103         root_dir = "%s_%d" % (
1104                 access_config.get_attribute_value(DC.ROOT_DIRECTORY), 
1105                 self._current)
1106         access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1107         controller = proxy.create_experiment_controller(self._experiment_xml,
1108                 access_config)
1109         self._experiments[self._current] = controller
1110         controller.start()
1111         started_at = time.time()
1112         # wait until all specified guids have finished execution
1113         if self._wait_guids:
1114             while all(itertools.imap(controller.is_finished, self._wait_guids):
1115                 time.sleep(0.5)
1116         # wait until the minimum experiment duration time has elapsed 
1117         if self._duration:
1118             while (time.time() - started_at) < self._duration:
1119                 time.sleep(0.5)
1120         controller.stop()
1121         #download results!!
1122         controller.shutdown()
1123     """
1124
1125
1126
1127
1128
1129
1130