Merge with head
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15 import time
16 import logging
17 logging.basicConfig()
18
19 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
20 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
21 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
22
23 def _undefer(deferred):
24     if hasattr(deferred, '_get'):
25         return deferred._get()
26     else:
27         return deferred
28
29
30 class TestbedController(object):
31     def __init__(self, testbed_id, testbed_version):
32         self._testbed_id = testbed_id
33         self._testbed_version = testbed_version
34
35     @property
36     def testbed_id(self):
37         return self._testbed_id
38
39     @property
40     def testbed_version(self):
41         return self._testbed_version
42
43     @property
44     def guids(self):
45         raise NotImplementedError
46
47     def defer_configure(self, name, value):
48         """Instructs setting a configuartion attribute for the testbed instance"""
49         raise NotImplementedError
50
51     def defer_create(self, guid, factory_id):
52         """Instructs creation of element """
53         raise NotImplementedError
54
55     def defer_create_set(self, guid, name, value):
56         """Instructs setting an initial attribute on an element"""
57         raise NotImplementedError
58
59     def defer_factory_set(self, guid, name, value):
60         """Instructs setting an attribute on a factory"""
61         raise NotImplementedError
62
63     def defer_connect(self, guid1, connector_type_name1, guid2, 
64             connector_type_name2): 
65         """Instructs creation of a connection between the given connectors"""
66         raise NotImplementedError
67
68     def defer_cross_connect(self, 
69             guid, connector_type_name,
70             cross_guid, cross_testbed_guid,
71             cross_testbed_id, cross_factory_id,
72             cross_connector_type_name):
73         """
74         Instructs creation of a connection between the given connectors 
75         of different testbed instances
76         """
77         raise NotImplementedError
78
79     def defer_add_trace(self, guid, trace_id):
80         """Instructs the addition of a trace"""
81         raise NotImplementedError
82
83     def defer_add_address(self, guid, address, netprefix, broadcast): 
84         """Instructs the addition of an address"""
85         raise NotImplementedError
86
87     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
88         """Instructs the addition of a route"""
89         raise NotImplementedError
90
91     def do_setup(self):
92         """After do_setup the testbed initial configuration is done"""
93         raise NotImplementedError
94
95     def do_create(self):
96         """
97         After do_create all instructed elements are created and 
98         attributes setted
99         """
100         raise NotImplementedError
101
102     def do_connect_init(self):
103         """
104         After do_connect_init all internal connections between testbed elements
105         are initiated
106         """
107         raise NotImplementedError
108
109     def do_connect_compl(self):
110         """
111         After do_connect all internal connections between testbed elements
112         are completed
113         """
114         raise NotImplementedError
115
116     def do_preconfigure(self):
117         """
118         Done just before resolving netrefs, after connection, before cross connections,
119         useful for early stages of configuration, for setting up stuff that might be
120         required for netref resolution.
121         """
122         raise NotImplementedError
123
124     def do_configure(self):
125         """After do_configure elements are configured"""
126         raise NotImplementedError
127
128     def do_prestart(self):
129         """Before do_start elements are prestart-configured"""
130         raise NotImplementedError
131
132     def do_cross_connect_init(self, cross_data):
133         """
134         After do_cross_connect_init initiation of all external connections 
135         between different testbed elements is performed
136         """
137         raise NotImplementedError
138
139     def do_cross_connect_compl(self, cross_data):
140         """
141         After do_cross_connect_compl completion of all external connections 
142         between different testbed elements is performed
143         """
144         raise NotImplementedError
145
146     def start(self):
147         raise NotImplementedError
148
149     def stop(self):
150         raise NotImplementedError
151
152     def recover(self):
153         """
154         On testbed recovery (if recovery is a supported policy), the controller
155         instance will be re-created and the following sequence invoked:
156         
157             do_setup
158             defer_X - programming the testbed with persisted execution values
159                 (not design values). Execution values (ExecImmutable attributes)
160                 should be enough to recreate the testbed's state.
161             *recover*
162             <cross-connection methods>
163             
164         Start will not be called, and after cross connection invocations,
165         the testbed is supposed to be fully functional again.
166         """
167         raise NotImplementedError
168
169     def set(self, guid, name, value, time = TIME_NOW):
170         raise NotImplementedError
171
172     def get(self, guid, name, time = TIME_NOW):
173         raise NotImplementedError
174     
175     def get_route(self, guid, index, attribute):
176         """
177         Params:
178             
179             guid: guid of box to query
180             index: number of routing entry to fetch
181             attribute: one of Destination, NextHop, NetPrefix
182         """
183         raise NotImplementedError
184
185     def get_address(self, guid, index, attribute='Address'):
186         """
187         Params:
188             
189             guid: guid of box to query
190             index: number of inteface to select
191             attribute: one of Address, NetPrefix, Broadcast
192         """
193         raise NotImplementedError
194
195     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
196         raise NotImplementedError
197
198     def get_factory_id(self, guid):
199         raise NotImplementedError
200
201     def action(self, time, guid, action):
202         raise NotImplementedError
203
204     def status(self, guid):
205         raise NotImplementedError
206     
207     def testbed_status(self):
208         raise NotImplementedError
209
210     def trace(self, guid, trace_id, attribute='value'):
211         raise NotImplementedError
212
213     def traces_info(self):
214         """ dictionary of dictionaries:
215             traces_info = dict({
216                 guid = dict({
217                     trace_id = dict({
218                             host = host,
219                             filepath = filepath,
220                             filesize = size in bytes,
221                         })
222                 })
223             })"""
224         raise NotImplementedError
225
226     def shutdown(self):
227         raise NotImplementedError
228
229 class ExperimentController(object):
230     def __init__(self, experiment_xml, root_dir):
231         self._experiment_design_xml = experiment_xml
232         self._experiment_execute_xml = None
233         self._testbeds = dict()
234         self._deployment_config = dict()
235         self._netrefs = collections.defaultdict(set)
236         self._testbed_netrefs = collections.defaultdict(set)
237         self._cross_data = dict()
238         self._root_dir = root_dir
239         self._netreffed_testbeds = set()
240         self._guids_in_testbed_cache = dict()
241         self._failed_testbeds = set()
242         self._started_time = None
243         self._stopped_time = None
244       
245         self._logger = logging.getLogger('nepi.core.execute')
246         level = logging.ERROR
247         if os.environ.get("NEPI_CONTROLLER_LOGLEVEL", 
248                 DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
249             level = logging.DEBUG
250         self._logger.setLevel(level)
251  
252         if experiment_xml is None and root_dir is not None:
253             # Recover
254             self.load_experiment_xml()
255             self.load_execute_xml()
256         else:
257             self.persist_experiment_xml()
258
259     @property
260     def experiment_design_xml(self):
261         return self._experiment_design_xml
262
263     @property
264     def experiment_execute_xml(self):
265         return self._experiment_execute_xml
266
267     @property
268     def started_time(self):
269         return self._started_time
270
271     @property
272     def stopped_time(self):
273         return self._stopped_time
274
275     @property
276     def guids(self):
277         guids = list()
278         for testbed_guid in self._testbeds.keys():
279             _guids = self._guids_in_testbed(testbed_guid)
280             if _guids:
281                 guids.extend(_guids)
282         return guids
283
284     def persist_experiment_xml(self):
285         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
286         f = open(xml_path, "w")
287         f.write(self._experiment_design_xml)
288         f.close()
289
290     def persist_execute_xml(self):
291         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
292         f = open(xml_path, "w")
293         f.write(self._experiment_execute_xml)
294         f.close()
295
296     def load_experiment_xml(self):
297         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
298         f = open(xml_path, "r")
299         self._experiment_design_xml = f.read()
300         f.close()
301
302     def load_execute_xml(self):
303         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
304         f = open(xml_path, "r")
305         self._experiment_execute_xml = f.read()
306         f.close()
307
308     def trace(self, guid, trace_id, attribute='value'):
309         testbed = self._testbed_for_guid(guid)
310         if testbed != None:
311             return testbed.trace(guid, trace_id, attribute)
312         raise RuntimeError("No element exists with guid %d" % guid)    
313
314     def traces_info(self):
315         traces_info = dict()
316         for guid, testbed in self._testbeds.iteritems():
317             tinfo = testbed.traces_info()
318             if tinfo:
319                 traces_info[guid] = testbed.traces_info()
320         return traces_info
321
322     @staticmethod
323     def _parallel(callables):
324         excs = []
325         def wrap(callable):
326             @functools.wraps(callable)
327             def wrapped(*p, **kw):
328                 try:
329                     callable(*p, **kw)
330                 except:
331                     logging.exception("Exception occurred in asynchronous thread:")
332                     excs.append(sys.exc_info())
333             return wrapped
334         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
335         for thread in threads:
336             thread.start()
337         for thread in threads:
338             thread.join()
339         for exc in excs:
340             eTyp, eVal, eLoc = exc
341             raise eTyp, eVal, eLoc
342
343     def start(self):
344         self._started_time = time.time() 
345         self._start()
346
347     def _start(self, recover = False):
348         parser = XmlExperimentParser()
349         
350         if recover:
351             xml = self._experiment_execute_xml
352         else:
353             xml = self._experiment_design_xml
354         data = parser.from_xml_to_data(xml)
355
356         # instantiate testbed controllers
357         to_recover, to_restart = self._init_testbed_controllers(data, recover)
358         all_restart = set(to_restart)
359         
360         if not recover:
361             # persist testbed connection data, for potential recovery
362             self._persist_testbed_proxies()
363         else:
364             # recover recoverable controllers
365             for guid in to_recover:
366                 try:
367                     self._testbeds[guid].do_setup()
368                     self._testbeds[guid].recover()
369                 except:
370                     self._logger.exception("During recovery of testbed %s", guid)
371                     
372                     # Mark failed
373                     self._failed_testbeds.add(guid)
374     
375         def steps_to_configure(self, allowed_guids):
376             # perform setup in parallel for all test beds,
377             # wait for all threads to finish
378
379             self._logger.debug("ExperimentController: Starting parallel do_setup")
380             self._parallel([testbed.do_setup 
381                             for guid,testbed in self._testbeds.iteritems()
382                             if guid in allowed_guids])
383        
384             # perform create-connect in parallel, wait
385             # (internal connections only)
386             self._logger.debug("ExperimentController: Starting parallel do_create")
387             self._parallel([testbed.do_create
388                             for guid,testbed in self._testbeds.iteritems()
389                             if guid in allowed_guids])
390
391             self._logger.debug("ExperimentController: Starting parallel do_connect_init")
392             self._parallel([testbed.do_connect_init
393                             for guid,testbed in self._testbeds.iteritems()
394                             if guid in allowed_guids])
395
396             self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
397             self._parallel([testbed.do_connect_compl
398                             for guid,testbed in self._testbeds.iteritems()
399                             if guid in allowed_guids])
400
401             self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
402             self._parallel([testbed.do_preconfigure
403                             for guid,testbed in self._testbeds.iteritems()
404                             if guid in allowed_guids])
405             self._clear_caches()
406
407         steps_to_configure(self, to_restart)
408
409         if self._netreffed_testbeds:
410             self._logger.debug("ExperimentController: Resolving netreffed testbeds")
411             # initally resolve netrefs
412             self.do_netrefs(data, fail_if_undefined=False)
413             
414             # rinse and repeat, for netreffed testbeds
415             netreffed_testbeds = set(self._netreffed_testbeds)
416
417             to_recover, to_restart = self._init_testbed_controllers(data, recover)
418             all_restart.update(to_restart)
419             
420             if not recover:
421                 # persist testbed connection data, for potential recovery
422                 self._persist_testbed_proxies()
423             else:
424                 # recover recoverable controllers
425                 for guid in to_recover:
426                     try:
427                         self._testbeds[guid].do_setup()
428                         self._testbeds[guid].recover()
429                     except:
430                         self._logger.exception("During recovery of testbed %s", guid)
431
432                         # Mark failed
433                         self._failed_testbeds.add(guid)
434
435             # configure dependant testbeds
436             steps_to_configure(self, to_restart)
437         
438         all_restart = [ self._testbeds[guid] for guid in all_restart ]
439             
440         # final netref step, fail if anything's left unresolved
441         self._logger.debug("ExperimentController: Resolving do_netrefs")
442         self.do_netrefs(data, fail_if_undefined=False)
443        
444         # Only now, that netref dependencies have been solve, it is safe to
445         # program cross_connections
446         self._logger.debug("ExperimentController: Programming testbed cross-connections")
447         self._program_testbed_cross_connections(data)
448  
449         # perform do_configure in parallel for al testbeds
450         # (it's internal configuration for each)
451         self._logger.debug("ExperimentController: Starting parallel do_configure")
452         self._parallel([testbed.do_configure
453                         for testbed in all_restart])
454
455         self._clear_caches()
456
457         #print >>sys.stderr, "DO IT"
458         #import time
459         #time.sleep(60)
460         
461         # cross-connect (cannot be done in parallel)
462         self._logger.debug("ExperimentController: Starting cross-connect")
463         for guid, testbed in self._testbeds.iteritems():
464             cross_data = self._get_cross_data(guid)
465             testbed.do_cross_connect_init(cross_data)
466         for guid, testbed in self._testbeds.iteritems():
467             cross_data = self._get_cross_data(guid)
468             testbed.do_cross_connect_compl(cross_data)
469        
470         self._clear_caches()
471
472         # Last chance to configure (parallel on all testbeds)
473         self._logger.debug("ExperimentController: Starting parallel do_prestart")
474         self._parallel([testbed.do_prestart
475                         for testbed in all_restart])
476
477         # final netref step, fail if anything's left unresolved
478         self.do_netrefs(data, fail_if_undefined=True)
479  
480         self._clear_caches()
481         
482         if not recover:
483             # update execution xml with execution-specific values
484             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
485             self._update_execute_xml()
486             self.persist_execute_xml()
487
488         # start experiment (parallel start on all testbeds)
489         self._logger.debug("ExperimentController: Starting parallel do_start")
490         self._parallel([testbed.start
491                         for testbed in all_restart])
492
493         self._clear_caches()
494
495     def _clear_caches(self):
496         # Cleaning cache for safety.
497         self._guids_in_testbed_cache = dict()
498
499     def _persist_testbed_proxies(self):
500         TRANSIENT = (DC.RECOVER,)
501         
502         # persist access configuration for all testbeds, so that
503         # recovery mode can reconnect to them if it becomes necessary
504         conf = ConfigParser.RawConfigParser()
505         for testbed_guid, testbed_config in self._deployment_config.iteritems():
506             testbed_guid = str(testbed_guid)
507             conf.add_section(testbed_guid)
508             for attr in testbed_config.get_attribute_list():
509                 if attr not in TRANSIENT:
510                     conf.set(testbed_guid, attr, 
511                         testbed_config.get_attribute_value(attr))
512         
513         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
514         conf.write(f)
515         f.close()
516     
517     def _load_testbed_proxies(self):
518         TYPEMAP = {
519             Attribute.STRING : 'get',
520             Attribute.BOOL : 'getboolean',
521             Attribute.ENUM : 'get',
522             Attribute.DOUBLE : 'getfloat',
523             Attribute.INTEGER : 'getint',
524         }
525         
526         TRANSIENT = (DC.RECOVER,)
527         
528         # deferred import because proxy needs
529         # our class definitions to define proxies
530         import nepi.util.proxy as proxy
531         
532         conf = ConfigParser.RawConfigParser()
533         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
534         for testbed_guid in conf.sections():
535             testbed_config = proxy.AccessConfiguration()
536             testbed_guid = str(testbed_guid)
537             for attr in testbed_config.get_attribute_list():
538                 if attr not in TRANSIENT:
539                     getter = getattr(conf, TYPEMAP.get(
540                         testbed_config.get_attribute_type(attr),
541                         'get') )
542                     testbed_config.set_attribute_value(
543                         attr, getter(testbed_guid, attr))
544     
545     def _unpersist_testbed_proxies(self):
546         try:
547             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
548         except:
549             # Just print exceptions, this is just cleanup
550             self._logger.exception("Loading testbed configuration")
551
552     def _update_execute_xml(self):
553         # For all testbeds,
554         #   For all elements in testbed,
555         #       - gather immutable execute-readable attribuets lists
556         #         asynchronously
557         # Generate new design description from design xml
558         # (Wait for attributes lists - implicit syncpoint)
559         # For all testbeds,
560         #   For all elements in testbed,
561         #       - gather all immutable execute-readable attribute
562         #         values, asynchronously
563         # (Wait for attribute values - implicit syncpoint)
564         # For all testbeds,
565         #   For all elements in testbed,
566         #       - inject non-None values into new design
567         # Generate execute xml from new design
568
569         attribute_lists = dict(
570             (testbed_guid, collections.defaultdict(dict))
571             for testbed_guid in self._testbeds
572         )
573         
574         for testbed_guid, testbed in self._testbeds.iteritems():
575             guids = self._guids_in_testbed(testbed_guid)
576             for guid in guids:
577                 attribute_lists[testbed_guid][guid] = \
578                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
579         
580         parser = XmlExperimentParser()
581         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
582
583         attribute_values = dict(
584             (testbed_guid, collections.defaultdict(dict))
585             for testbed_guid in self._testbeds
586         )
587         
588         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
589             testbed = self._testbeds[testbed_guid]
590             for guid, attribute_list in testbed_attribute_lists.iteritems():
591                 attribute_list = _undefer(attribute_list)
592                 attribute_values[testbed_guid][guid] = dict(
593                     (attribute, testbed.get_deferred(guid, attribute))
594                     for attribute in attribute_list
595                 )
596         
597         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
598             for guid, attribute_values in testbed_attribute_values.iteritems():
599                 for attribute, value in attribute_values.iteritems():
600                     value = _undefer(value)
601                     if value is not None:
602                         execute_data.add_attribute_data(guid, attribute, value)
603         
604         self._experiment_execute_xml = parser.to_xml(data=execute_data)
605
606     def stop(self):
607        for testbed in self._testbeds.values():
608            testbed.stop()
609        self._unpersist_testbed_proxies()
610        self._stopped_time = time.time() 
611    
612     def recover(self):
613         # reload perviously persisted testbed access configurations
614         self._failed_testbeds.clear()
615         self._load_testbed_proxies()
616
617         # re-program testbeds that need recovery
618         self._start(recover = True)
619
620     def is_finished(self, guid):
621         testbed = self._testbed_for_guid(guid)
622         if testbed != None:
623             return testbed.status(guid) == AS.STATUS_FINISHED
624         raise RuntimeError("No element exists with guid %d" % guid)    
625     
626     def _testbed_recovery_policy(self, guid, data = None):
627         if data is None:
628             parser = XmlExperimentParser()
629             data = parser.from_xml_to_data(self._experiment_design_xml)
630         
631         return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
632
633     def status(self, guid):
634         if guid in self._testbeds:
635             # guid is a testbed
636             # report testbed status
637             if guid in self._failed_testbeds:
638                 return TS.STATUS_FAILED
639             else:
640                 try:
641                     return self._testbeds[guid].status()
642                 except:
643                     return TS.STATUS_UNRESPONSIVE
644         else:
645             # guid is an element
646             testbed = self._testbed_for_guid(guid)
647             if testbed is not None:
648                 return testbed.status(guid)
649             else:
650                 return AS.STATUS_UNDETERMINED
651
652     def set(self, guid, name, value, time = TIME_NOW):
653         testbed = self._testbed_for_guid(guid)
654         if testbed != None:
655             testbed.set(guid, name, value, time)
656         else:
657             raise RuntimeError("No element exists with guid %d" % guid)    
658
659     def get(self, guid, name, time = TIME_NOW):
660         testbed = self._testbed_for_guid(guid)
661         if testbed != None:
662             return testbed.get(guid, name, time)
663         raise RuntimeError("No element exists with guid %d" % guid)    
664
665     def get_deferred(self, guid, name, time = TIME_NOW):
666         testbed = self._testbed_for_guid(guid)
667         if testbed != None:
668             return testbed.get_deferred(guid, name, time)
669         raise RuntimeError("No element exists with guid %d" % guid)    
670
671     def get_factory_id(self, guid):
672         testbed = self._testbed_for_guid(guid)
673         if testbed != None:
674             return testbed.get_factory_id(guid)
675         raise RuntimeError("No element exists with guid %d" % guid)    
676
677     def get_testbed_id(self, guid):
678         testbed = self._testbed_for_guid(guid)
679         if testbed != None:
680             return testbed.testbed_id
681         raise RuntimeError("No element exists with guid %d" % guid)    
682
683     def get_testbed_version(self, guid):
684         testbed = self._testbed_for_guid(guid)
685         if testbed != None:
686             return testbed.testbed_version
687         raise RuntimeError("No element exists with guid %d" % guid)    
688
689     def shutdown(self):
690         exceptions = list()
691         for testbed in self._testbeds.values():
692             try:
693                 testbed.shutdown()
694             except:
695                 exceptions.append(sys.exc_info())
696         for exc_info in exceptions:
697             raise exc_info[0], exc_info[1], exc_info[2]
698
699     def _testbed_for_guid(self, guid):
700         for testbed_guid in self._testbeds.keys():
701             if guid in self._guids_in_testbed(testbed_guid):
702                 if testbed_guid in self._failed_testbeds:
703                     return None
704                 return self._testbeds[testbed_guid]
705         return None
706
707     def _guids_in_testbed(self, testbed_guid):
708         if testbed_guid not in self._testbeds:
709             return set()
710         if testbed_guid not in self._guids_in_testbed_cache:
711             self._guids_in_testbed_cache[testbed_guid] = \
712                 set(self._testbeds[testbed_guid].guids)
713         return self._guids_in_testbed_cache[testbed_guid]
714
715     @staticmethod
716     def _netref_component_split(component):
717         match = COMPONENT_PATTERN.match(component)
718         if match:
719             return match.group("kind"), match.group("index")
720         else:
721             return component, None
722
723     _NETREF_COMPONENT_GETTERS = {
724         'addr':
725             lambda testbed, guid, index, name: 
726                 testbed.get_address(guid, int(index), name),
727         'route' :
728             lambda testbed, guid, index, name: 
729                 testbed.get_route(guid, int(index), name),
730         'trace' :
731             lambda testbed, guid, index, name: 
732                 testbed.trace(guid, index, attribute = name),
733         '' : 
734             lambda testbed, guid, index, name: 
735                 testbed.get(guid, name),
736     }
737     
738     def resolve_netref_value(self, value, failval = None):
739         rv = failval
740         while True:
741             for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
742                 label = match.group("label")
743                 if label.startswith('GUID-'):
744                     ref_guid = int(label[5:])
745                     if ref_guid:
746                         expr = match.group("expr")
747                         component = (match.group("component") or "")[1:] # skip the dot
748                         attribute = match.group("attribute")
749                         
750                         # split compound components into component kind and index
751                         # eg: 'addr[0]' -> ('addr', '0')
752                         component, component_index = self._netref_component_split(component)
753
754                         # find object and resolve expression
755                         for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
756                             if component not in self._NETREF_COMPONENT_GETTERS:
757                                 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
758                             elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
759                                 pass
760                             else:
761                                 ref_value = self._NETREF_COMPONENT_GETTERS[component](
762                                     ref_testbed, ref_guid, component_index, attribute)
763                                 if ref_value:
764                                     value = rv = value.replace(match.group(), ref_value)
765                                     break
766                         else:
767                             # unresolvable netref
768                             return failval
769                         break
770             else:
771                 break
772         return rv
773     
774     def do_netrefs(self, data, fail_if_undefined = False):
775         # element netrefs
776         for (testbed_guid, guid), attrs in self._netrefs.items():
777             testbed = self._testbeds.get(testbed_guid)
778             if testbed is not None:
779                 for name in set(attrs):
780                     value = testbed.get(guid, name)
781                     if isinstance(value, basestring):
782                         ref_value = self.resolve_netref_value(value)
783                         if ref_value is not None:
784                             testbed.set(guid, name, ref_value)
785                             attrs.remove(name)
786                         elif fail_if_undefined:
787                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
788                 if not attrs:
789                     del self._netrefs[(testbed_guid, guid)]
790         
791         # testbed netrefs
792         for testbed_guid, attrs in self._testbed_netrefs.items():
793             tb_data = dict(data.get_attribute_data(testbed_guid))
794             if data:
795                 for name in set(attrs):
796                     value = tb_data.get(name)
797                     if isinstance(value, basestring):
798                         ref_value = self.resolve_netref_value(value)
799                         if ref_value is not None:
800                             data.set_attribute_data(testbed_guid, name, ref_value)
801                             attrs.remove(name)
802                         elif fail_if_undefined:
803                             raise ValueError, "Unresolvable netref in: %r" % (value,)
804                 if not attrs:
805                     del self._testbed_netrefs[testbed_guid]
806         
807
808     def _init_testbed_controllers(self, data, recover = False):
809         blacklist_testbeds = set(self._testbeds)
810         element_guids = list()
811         label_guids = dict()
812         data_guids = data.guids
813         to_recover = set()
814         to_restart = set()
815
816         # gather label associations
817         for guid in data_guids:
818             if not data.is_testbed_data(guid):
819                 (testbed_guid, factory_id) = data.get_box_data(guid)
820                 label = data.get_attribute_data(guid, "label")
821                 if label is not None:
822                     if label in label_guids:
823                         raise RuntimeError, "Label %r is not unique" % (label,)
824                     label_guids[label] = guid
825
826         # create testbed controllers
827         for guid in data_guids:
828             if data.is_testbed_data(guid):
829                 if guid not in self._testbeds:
830                     try:
831                         self._create_testbed_controller(
832                             guid, data, element_guids, recover)
833                         if recover:
834                             # Already programmed
835                             blacklist_testbeds.add(guid)
836                         else:
837                             to_restart.add(guid)
838                     except:
839                         if recover:
840                             policy = self._testbed_recovery_policy(guid, data=data)
841                             if policy == DC.POLICY_RECOVER:
842                                 self._create_testbed_controller(
843                                     guid, data, element_guids, False)
844                                 to_recover.add(guid)
845                             elif policy == DC.POLICY_RESTART:
846                                 self._create_testbed_controller(
847                                     guid, data, element_guids, False)
848                                 to_restart.add(guid)
849                             else:
850                                 # Mark failed
851                                 self._failed_testbeds.add(guid)
852                         else:
853                             raise
854         
855         # queue programmable elements
856         #  - that have not been programmed already (blacklist_testbeds)
857         #  - including recovered or restarted testbeds
858         #  - but those that have no unresolved netrefs
859         for guid in data_guids:
860             if not data.is_testbed_data(guid):
861                 (testbed_guid, factory_id) = data.get_box_data(guid)
862                 if testbed_guid not in blacklist_testbeds:
863                     element_guids.append(guid)
864
865         # replace references to elements labels for its guid
866         self._resolve_labels(data, data_guids, label_guids)
867     
868         # program testbed controllers
869         if element_guids:
870             self._program_testbed_controllers(element_guids, data)
871         
872         return to_recover, to_restart
873
874     def _resolve_labels(self, data, data_guids, label_guids):
875         netrefs = self._netrefs
876         testbed_netrefs = self._testbed_netrefs
877         for guid in data_guids:
878             for name, value in data.get_attribute_data(guid):
879                 if isinstance(value, basestring):
880                     while True:
881                         for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
882                             label = match.group("label")
883                             if not label.startswith('GUID-'):
884                                 ref_guid = label_guids.get(label)
885                                 if ref_guid is not None:
886                                     value = value.replace(
887                                         match.group(),
888                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
889                                             guid = 'GUID-%d' % (ref_guid,),
890                                             expr = match.group("expr"),
891                                             label = label)
892                                     )
893                                     data.set_attribute_data(guid, name, value)
894                                     
895                                     # memorize which guid-attribute pairs require
896                                     # postprocessing, to avoid excessive controller-testbed
897                                     # communication at configuration time
898                                     # (which could require high-latency network I/O)
899                                     if not data.is_testbed_data(guid):
900                                         (testbed_guid, factory_id) = data.get_box_data(guid)
901                                         netrefs[(testbed_guid, guid)].add(name)
902                                     else:
903                                         testbed_netrefs[guid].add(name)
904                                     
905                                     break
906                         else:
907                             break
908
909     def _create_testbed_controller(self, guid, data, element_guids, recover):
910         (testbed_id, testbed_version) = data.get_testbed_data(guid)
911         deployment_config = self._deployment_config.get(guid)
912         
913         # deferred import because proxy needs
914         # our class definitions to define proxies
915         import nepi.util.proxy as proxy
916         
917         if deployment_config is None:
918             # need to create one
919             deployment_config = proxy.AccessConfiguration()
920             
921             for (name, value) in data.get_attribute_data(guid):
922                 if value is not None and deployment_config.has_attribute(name):
923                     # if any deployment config attribute has a netref, we can't
924                     # create this controller yet
925                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
926                         # remember to re-issue this one
927                         self._netreffed_testbeds.add(guid)
928                         return
929                     
930                     # copy deployment config attribute
931                     deployment_config.set_attribute_value(name, value)
932             
933             # commit config
934             self._deployment_config[guid] = deployment_config
935         
936         if deployment_config is not None:
937             # force recovery mode 
938             deployment_config.set_attribute_value("recover",recover)
939         
940         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
941                 deployment_config)
942         for (name, value) in data.get_attribute_data(guid):
943             testbed.defer_configure(name, value)
944         self._testbeds[guid] = testbed
945         if guid in self._netreffed_testbeds:
946             self._netreffed_testbeds.remove(guid)
947
948     def _program_testbed_controllers(self, element_guids, data):
949         def resolve_create_netref(data, guid, name, value): 
950             # Try to resolve create-time netrefs, if possible
951             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
952                 try:
953                     nuvalue = self.resolve_netref_value(value)
954                 except:
955                     # Any trouble means we're not in shape to resolve the netref yet
956                     nuvalue = None
957                 if nuvalue is not None:
958                     # Only if we succeed we remove the netref deferral entry
959                     value = nuvalue
960                     data.set_attribute_data(guid, name, value)
961                     if (testbed_guid, guid) in self._netrefs:
962                         self._netrefs[(testbed_guid, guid)].discard(name)
963             return value
964
965         for guid in element_guids:
966             (testbed_guid, factory_id) = data.get_box_data(guid)
967             testbed = self._testbeds.get(testbed_guid)
968             if testbed is not None:
969                 # create
970                 testbed.defer_create(guid, factory_id)
971                 # set attributes
972                 for (name, value) in data.get_attribute_data(guid):
973                     value = resolve_create_netref(data, guid, name, value)
974                     testbed.defer_create_set(guid, name, value)
975
976         for guid in element_guids:
977             (testbed_guid, factory_id) = data.get_box_data(guid)
978             testbed = self._testbeds.get(testbed_guid)
979             if testbed is not None:
980                 # traces
981                 for trace_id in data.get_trace_data(guid):
982                     testbed.defer_add_trace(guid, trace_id)
983                 # addresses
984                 for (address, netprefix, broadcast) in data.get_address_data(guid):
985                     if address != None:
986                         testbed.defer_add_address(guid, address, netprefix, 
987                                 broadcast)
988                 # routes
989                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
990                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
991                 # store connections data
992                 for (connector_type_name, other_guid, other_connector_type_name) \
993                         in data.get_connection_data(guid):
994                     (other_testbed_guid, other_factory_id) = data.get_box_data(
995                             other_guid)
996                     if testbed_guid == other_testbed_guid:
997                         # each testbed should take care of enforcing internal
998                         # connection simmetry, so each connection is only
999                         # added in one direction
1000                         testbed.defer_connect(guid, connector_type_name, 
1001                                 other_guid, other_connector_type_name)
1002
1003     def _program_testbed_cross_connections(self, data):
1004         data_guids = data.guids
1005         for guid in data_guids: 
1006             if not data.is_testbed_data(guid):
1007                 (testbed_guid, factory_id) = data.get_box_data(guid)
1008                 testbed = self._testbeds.get(testbed_guid)
1009                 if testbed is not None:
1010                     for (connector_type_name, cross_guid, cross_connector_type_name) \
1011                             in data.get_connection_data(guid):
1012                         (testbed_guid, factory_id) = data.get_box_data(guid)
1013                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
1014                                 cross_guid)
1015                         if testbed_guid != cross_testbed_guid:
1016                             cross_testbed = self._testbeds[cross_testbed_guid]
1017                             cross_testbed_id = cross_testbed.testbed_id
1018                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
1019                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
1020                                     cross_connector_type_name)
1021                             # save cross data for later
1022                             self._logger.debug("ExperimentController: adding cross_connection data tbd=%d:guid=%d - tbd=%d:guid=%d" % \
1023                                     (testbed_guid, guid, cross_testbed_guid, cross_guid))
1024                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1025                                     cross_guid)
1026
1027     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1028         if testbed_guid not in self._cross_data:
1029             self._cross_data[testbed_guid] = dict()
1030         if cross_testbed_guid not in self._cross_data[testbed_guid]:
1031             self._cross_data[testbed_guid][cross_testbed_guid] = set()
1032         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1033
1034     def _get_cross_data(self, testbed_guid):
1035         cross_data = dict()
1036         if not testbed_guid in self._cross_data:
1037             return cross_data
1038
1039         # fetch attribute lists in one batch
1040         attribute_lists = dict()
1041         for cross_testbed_guid, guid_list in \
1042                 self._cross_data[testbed_guid].iteritems():
1043             cross_testbed = self._testbeds[cross_testbed_guid]
1044             for cross_guid in guid_list:
1045                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1046                     cross_testbed.get_attribute_list_deferred(cross_guid)
1047
1048         # fetch attribute values in another batch
1049         for cross_testbed_guid, guid_list in \
1050                 self._cross_data[testbed_guid].iteritems():
1051             cross_data[cross_testbed_guid] = dict()
1052             cross_testbed = self._testbeds[cross_testbed_guid]
1053             for cross_guid in guid_list:
1054                 elem_cross_data = dict(
1055                     _guid = cross_guid,
1056                     _testbed_guid = cross_testbed_guid,
1057                     _testbed_id = cross_testbed.testbed_id,
1058                     _testbed_version = cross_testbed.testbed_version)
1059                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1060                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1061                 for attr_name in attribute_list:
1062                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1063                     elem_cross_data[attr_name] = attr_value
1064         
1065         # undefer all values - we'll have to serialize them probably later
1066         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1067             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1068                 for attr_name, attr_value in elem_cross_data.iteritems():
1069                     elem_cross_data[attr_name] = _undefer(attr_value)
1070         
1071         return cross_data
1072     """
1073 class ExperimentSuite(object):
1074     def __init__(self, experiment_xml, access_config, repetitions,
1075             duration, wait_guids):
1076         self._experiment_xml = experiment_xml
1077         self._access_config = access_config
1078         self._experiments = dict()
1079         self._repetitions = repetitions
1080         self._duration = duration
1081         self._wait_guids = wait_guids
1082         self._current = None
1083         self._status = TS.STATUS_ZERO
1084         self._thread = None
1085
1086     def start(self):
1087         self._status  = TS.STATUS_STARTED
1088         self._thread = threading.Thread(target = self._run_experiment_suite)
1089         self._thread.start()
1090
1091     def shutdown(self):
1092         if self._thread:
1093             self._thread.join()
1094             self._thread = None
1095
1096     def _run_experiment_suite(self):
1097         for i in xrange[0, self.repetitions]:
1098             self._current = i
1099             self._run_one_experiment()
1100
1101     def _run_one_experiment(self):
1102         access_config = proxy.AccessConfiguration()
1103         for attr in self._access_config.attributes:
1104             access_config.set_attribute_value(attr.name, attr.value)
1105         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1106         root_dir = "%s_%d" % (
1107                 access_config.get_attribute_value(DC.ROOT_DIRECTORY), 
1108                 self._current)
1109         access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1110         controller = proxy.create_experiment_controller(self._experiment_xml,
1111                 access_config)
1112         self._experiments[self._current] = controller
1113         controller.start()
1114         started_at = time.time()
1115         # wait until all specified guids have finished execution
1116         if self._wait_guids:
1117             while all(itertools.imap(controller.is_finished, self._wait_guids):
1118                 time.sleep(0.5)
1119         # wait until the minimum experiment duration time has elapsed 
1120         if self._duration:
1121             while (time.time() - started_at) < self._duration:
1122                 time.sleep(0.5)
1123         controller.stop()
1124         #download results!!
1125         controller.shutdown()
1126     """
1127
1128
1129
1130
1131
1132
1133