Fix testbed proxy serialization in the presence of missing values (ie: defaults or...
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15 import time
16 import logging
17 logging.basicConfig()
18
19 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
20 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
21 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
22
23 def _undefer(deferred):
24     if hasattr(deferred, '_get'):
25         return deferred._get()
26     else:
27         return deferred
28
29
30 class TestbedController(object):
31     def __init__(self, testbed_id, testbed_version):
32         self._testbed_id = testbed_id
33         self._testbed_version = testbed_version
34
35     @property
36     def testbed_id(self):
37         return self._testbed_id
38
39     @property
40     def testbed_version(self):
41         return self._testbed_version
42
43     @property
44     def guids(self):
45         raise NotImplementedError
46
47     def defer_configure(self, name, value):
48         """Instructs setting a configuartion attribute for the testbed instance"""
49         raise NotImplementedError
50
51     def defer_create(self, guid, factory_id):
52         """Instructs creation of element """
53         raise NotImplementedError
54
55     def defer_create_set(self, guid, name, value):
56         """Instructs setting an initial attribute on an element"""
57         raise NotImplementedError
58
59     def defer_factory_set(self, guid, name, value):
60         """Instructs setting an attribute on a factory"""
61         raise NotImplementedError
62
63     def defer_connect(self, guid1, connector_type_name1, guid2, 
64             connector_type_name2): 
65         """Instructs creation of a connection between the given connectors"""
66         raise NotImplementedError
67
68     def defer_cross_connect(self, 
69             guid, connector_type_name,
70             cross_guid, cross_testbed_guid,
71             cross_testbed_id, cross_factory_id,
72             cross_connector_type_name):
73         """
74         Instructs creation of a connection between the given connectors 
75         of different testbed instances
76         """
77         raise NotImplementedError
78
79     def defer_add_trace(self, guid, trace_id):
80         """Instructs the addition of a trace"""
81         raise NotImplementedError
82
83     def defer_add_address(self, guid, address, netprefix, broadcast): 
84         """Instructs the addition of an address"""
85         raise NotImplementedError
86
87     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
88         """Instructs the addition of a route"""
89         raise NotImplementedError
90
91     def do_setup(self):
92         """After do_setup the testbed initial configuration is done"""
93         raise NotImplementedError
94
95     def do_create(self):
96         """
97         After do_create all instructed elements are created and 
98         attributes setted
99         """
100         raise NotImplementedError
101
102     def do_connect_init(self):
103         """
104         After do_connect_init all internal connections between testbed elements
105         are initiated
106         """
107         raise NotImplementedError
108
109     def do_connect_compl(self):
110         """
111         After do_connect all internal connections between testbed elements
112         are completed
113         """
114         raise NotImplementedError
115
116     def do_preconfigure(self):
117         """
118         Done just before resolving netrefs, after connection, before cross connections,
119         useful for early stages of configuration, for setting up stuff that might be
120         required for netref resolution.
121         """
122         raise NotImplementedError
123
124     def do_configure(self):
125         """After do_configure elements are configured"""
126         raise NotImplementedError
127
128     def do_prestart(self):
129         """Before do_start elements are prestart-configured"""
130         raise NotImplementedError
131
132     def do_cross_connect_init(self, cross_data):
133         """
134         After do_cross_connect_init initiation of all external connections 
135         between different testbed elements is performed
136         """
137         raise NotImplementedError
138
139     def do_cross_connect_compl(self, cross_data):
140         """
141         After do_cross_connect_compl completion of all external connections 
142         between different testbed elements is performed
143         """
144         raise NotImplementedError
145
146     def start(self):
147         raise NotImplementedError
148
149     def stop(self):
150         raise NotImplementedError
151
152     def recover(self):
153         """
154         On testbed recovery (if recovery is a supported policy), the controller
155         instance will be re-created and the following sequence invoked:
156         
157             do_setup
158             defer_X - programming the testbed with persisted execution values
159                 (not design values). Execution values (ExecImmutable attributes)
160                 should be enough to recreate the testbed's state.
161             *recover*
162             <cross-connection methods>
163             
164         Start will not be called, and after cross connection invocations,
165         the testbed is supposed to be fully functional again.
166         """
167         raise NotImplementedError
168
169     def set(self, guid, name, value, time = TIME_NOW):
170         raise NotImplementedError
171
172     def get(self, guid, name, time = TIME_NOW):
173         raise NotImplementedError
174     
175     def get_route(self, guid, index, attribute):
176         """
177         Params:
178             
179             guid: guid of box to query
180             index: number of routing entry to fetch
181             attribute: one of Destination, NextHop, NetPrefix
182         """
183         raise NotImplementedError
184
185     def get_address(self, guid, index, attribute='Address'):
186         """
187         Params:
188             
189             guid: guid of box to query
190             index: number of inteface to select
191             attribute: one of Address, NetPrefix, Broadcast
192         """
193         raise NotImplementedError
194
195     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
196         raise NotImplementedError
197
198     def get_factory_id(self, guid):
199         raise NotImplementedError
200
201     def action(self, time, guid, action):
202         raise NotImplementedError
203
204     def status(self, guid):
205         raise NotImplementedError
206     
207     def testbed_status(self):
208         raise NotImplementedError
209
210     def trace(self, guid, trace_id, attribute='value'):
211         raise NotImplementedError
212
213     def traces_info(self):
214         """ dictionary of dictionaries:
215             traces_info = dict({
216                 guid = dict({
217                     trace_id = dict({
218                             host = host,
219                             filepath = filepath,
220                             filesize = size in bytes,
221                         })
222                 })
223             })"""
224         raise NotImplementedError
225
226     def shutdown(self):
227         raise NotImplementedError
228
229 class ExperimentController(object):
230     def __init__(self, experiment_xml, root_dir):
231         self._experiment_design_xml = experiment_xml
232         self._experiment_execute_xml = None
233         self._testbeds = dict()
234         self._deployment_config = dict()
235         self._netrefs = collections.defaultdict(set)
236         self._testbed_netrefs = collections.defaultdict(set)
237         self._cross_data = dict()
238         self._root_dir = root_dir
239         self._netreffed_testbeds = set()
240         self._guids_in_testbed_cache = dict()
241         self._failed_testbeds = set()
242         self._started_time = None
243         self._stopped_time = None
244       
245         self._logger = logging.getLogger('nepi.core.execute')
246         level = logging.ERROR
247         if os.environ.get("NEPI_CONTROLLER_LOGLEVEL", 
248                 DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
249             level = logging.DEBUG
250         self._logger.setLevel(level)
251  
252         if experiment_xml is None and root_dir is not None:
253             # Recover
254             self.load_experiment_xml()
255             self.load_execute_xml()
256         else:
257             self.persist_experiment_xml()
258
259     @property
260     def experiment_design_xml(self):
261         return self._experiment_design_xml
262
263     @property
264     def experiment_execute_xml(self):
265         return self._experiment_execute_xml
266
267     @property
268     def started_time(self):
269         return self._started_time
270
271     @property
272     def stopped_time(self):
273         return self._stopped_time
274
275     @property
276     def guids(self):
277         guids = list()
278         for testbed_guid in self._testbeds.keys():
279             _guids = self._guids_in_testbed(testbed_guid)
280             if _guids:
281                 guids.extend(_guids)
282         return guids
283
284     def persist_experiment_xml(self):
285         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
286         f = open(xml_path, "w")
287         f.write(self._experiment_design_xml)
288         f.close()
289
290     def persist_execute_xml(self):
291         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
292         f = open(xml_path, "w")
293         f.write(self._experiment_execute_xml)
294         f.close()
295
296     def load_experiment_xml(self):
297         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
298         f = open(xml_path, "r")
299         self._experiment_design_xml = f.read()
300         f.close()
301
302     def load_execute_xml(self):
303         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
304         f = open(xml_path, "r")
305         self._experiment_execute_xml = f.read()
306         f.close()
307
308     def trace(self, guid, trace_id, attribute='value'):
309         testbed = self._testbed_for_guid(guid)
310         if testbed != None:
311             return testbed.trace(guid, trace_id, attribute)
312         raise RuntimeError("No element exists with guid %d" % guid)    
313
314     def traces_info(self):
315         traces_info = dict()
316         for guid, testbed in self._testbeds.iteritems():
317             tinfo = testbed.traces_info()
318             if tinfo:
319                 traces_info[guid] = testbed.traces_info()
320         return traces_info
321
322     @staticmethod
323     def _parallel(callables):
324         excs = []
325         def wrap(callable):
326             @functools.wraps(callable)
327             def wrapped(*p, **kw):
328                 try:
329                     callable(*p, **kw)
330                 except:
331                     logging.exception("Exception occurred in asynchronous thread:")
332                     excs.append(sys.exc_info())
333             return wrapped
334         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
335         for thread in threads:
336             thread.start()
337         for thread in threads:
338             thread.join()
339         for exc in excs:
340             eTyp, eVal, eLoc = exc
341             raise eTyp, eVal, eLoc
342
343     def start(self):
344         self._started_time = time.time() 
345         self._start()
346
347     def _start(self, recover = False):
348         parser = XmlExperimentParser()
349         
350         if recover:
351             xml = self._experiment_execute_xml
352         else:
353             xml = self._experiment_design_xml
354         data = parser.from_xml_to_data(xml)
355
356         # instantiate testbed controllers
357         to_recover, to_restart = self._init_testbed_controllers(data, recover)
358         all_restart = set(to_restart)
359         
360         if not recover:
361             # persist testbed connection data, for potential recovery
362             self._persist_testbed_proxies()
363         else:
364             # recover recoverable controllers
365             for guid in to_recover:
366                 try:
367                     self._testbeds[guid].do_setup()
368                     self._testbeds[guid].recover()
369                 except:
370                     self._logger.exception("During recovery of testbed %s", guid)
371                     
372                     # Mark failed
373                     self._failed_testbeds.add(guid)
374     
375         def steps_to_configure(self, allowed_guids):
376             # perform setup in parallel for all test beds,
377             # wait for all threads to finish
378
379             self._logger.debug("ExperimentController: Starting parallel do_setup")
380             self._parallel([testbed.do_setup 
381                             for guid,testbed in self._testbeds.iteritems()
382                             if guid in allowed_guids])
383        
384             # perform create-connect in parallel, wait
385             # (internal connections only)
386             self._logger.debug("ExperimentController: Starting parallel do_create")
387             self._parallel([testbed.do_create
388                             for guid,testbed in self._testbeds.iteritems()
389                             if guid in allowed_guids])
390
391             self._logger.debug("ExperimentController: Starting parallel do_connect_init")
392             self._parallel([testbed.do_connect_init
393                             for guid,testbed in self._testbeds.iteritems()
394                             if guid in allowed_guids])
395
396             self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
397             self._parallel([testbed.do_connect_compl
398                             for guid,testbed in self._testbeds.iteritems()
399                             if guid in allowed_guids])
400
401             self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
402             self._parallel([testbed.do_preconfigure
403                             for guid,testbed in self._testbeds.iteritems()
404                             if guid in allowed_guids])
405             self._clear_caches()
406
407         steps_to_configure(self, to_restart)
408
409         if self._netreffed_testbeds:
410             self._logger.debug("ExperimentController: Resolving netreffed testbeds")
411             # initally resolve netrefs
412             self.do_netrefs(data, fail_if_undefined=False)
413             
414             # rinse and repeat, for netreffed testbeds
415             netreffed_testbeds = set(self._netreffed_testbeds)
416
417             to_recover, to_restart = self._init_testbed_controllers(data, recover)
418             all_restart.update(to_restart)
419             
420             if not recover:
421                 # persist testbed connection data, for potential recovery
422                 self._persist_testbed_proxies()
423             else:
424                 # recover recoverable controllers
425                 for guid in to_recover:
426                     try:
427                         self._testbeds[guid].do_setup()
428                         self._testbeds[guid].recover()
429                     except:
430                         self._logger.exception("During recovery of testbed %s", guid)
431
432                         # Mark failed
433                         self._failed_testbeds.add(guid)
434
435             # configure dependant testbeds
436             steps_to_configure(self, to_restart)
437         
438         all_restart = [ self._testbeds[guid] for guid in all_restart ]
439             
440         # final netref step, fail if anything's left unresolved
441         self._logger.debug("ExperimentController: Resolving do_netrefs")
442         self.do_netrefs(data, fail_if_undefined=False)
443        
444         # Only now, that netref dependencies have been solve, it is safe to
445         # program cross_connections
446         self._logger.debug("ExperimentController: Programming testbed cross-connections")
447         self._program_testbed_cross_connections(data)
448  
449         # perform do_configure in parallel for al testbeds
450         # (it's internal configuration for each)
451         self._logger.debug("ExperimentController: Starting parallel do_configure")
452         self._parallel([testbed.do_configure
453                         for testbed in all_restart])
454
455         self._clear_caches()
456
457         #print >>sys.stderr, "DO IT"
458         #import time
459         #time.sleep(60)
460         
461         # cross-connect (cannot be done in parallel)
462         self._logger.debug("ExperimentController: Starting cross-connect")
463         for guid, testbed in self._testbeds.iteritems():
464             cross_data = self._get_cross_data(guid)
465             testbed.do_cross_connect_init(cross_data)
466         for guid, testbed in self._testbeds.iteritems():
467             cross_data = self._get_cross_data(guid)
468             testbed.do_cross_connect_compl(cross_data)
469        
470         self._clear_caches()
471
472         # Last chance to configure (parallel on all testbeds)
473         self._logger.debug("ExperimentController: Starting parallel do_prestart")
474         self._parallel([testbed.do_prestart
475                         for testbed in all_restart])
476
477         # final netref step, fail if anything's left unresolved
478         self.do_netrefs(data, fail_if_undefined=True)
479  
480         self._clear_caches()
481         
482         if not recover:
483             # update execution xml with execution-specific values
484             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
485             self._update_execute_xml()
486             self.persist_execute_xml()
487
488         # start experiment (parallel start on all testbeds)
489         self._logger.debug("ExperimentController: Starting parallel do_start")
490         self._parallel([testbed.start
491                         for testbed in all_restart])
492
493         self._clear_caches()
494
495     def _clear_caches(self):
496         # Cleaning cache for safety.
497         self._guids_in_testbed_cache = dict()
498
499     def _persist_testbed_proxies(self):
500         TRANSIENT = (DC.RECOVER,)
501         
502         # persist access configuration for all testbeds, so that
503         # recovery mode can reconnect to them if it becomes necessary
504         conf = ConfigParser.RawConfigParser()
505         for testbed_guid, testbed_config in self._deployment_config.iteritems():
506             testbed_guid = str(testbed_guid)
507             conf.add_section(testbed_guid)
508             for attr in testbed_config.get_attribute_list():
509                 if attr not in TRANSIENT:
510                     value = testbed_config.get_attribute_value(attr)
511                     if value is not None:
512                         conf.set(testbed_guid, attr, value)
513         
514         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
515         conf.write(f)
516         f.close()
517     
518     def _load_testbed_proxies(self):
519         TYPEMAP = {
520             Attribute.STRING : 'get',
521             Attribute.BOOL : 'getboolean',
522             Attribute.ENUM : 'get',
523             Attribute.DOUBLE : 'getfloat',
524             Attribute.INTEGER : 'getint',
525         }
526         
527         TRANSIENT = (DC.RECOVER,)
528         
529         # deferred import because proxy needs
530         # our class definitions to define proxies
531         import nepi.util.proxy as proxy
532         
533         conf = ConfigParser.RawConfigParser()
534         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
535         for testbed_guid in conf.sections():
536             testbed_config = proxy.AccessConfiguration()
537             testbed_guid = str(testbed_guid)
538             for attr in testbed_config.get_attribute_list():
539                 if attr not in TRANSIENT:
540                     getter = getattr(conf, TYPEMAP.get(
541                         testbed_config.get_attribute_type(attr),
542                         'get') )
543                     try:
544                         value = getter(testbed_guid, attr)
545                         testbed_config.set_attribute_value(attr, value)
546                     except ConfigParser.NoOptionError:
547                         # Leave default
548                         pass
549     
550     def _unpersist_testbed_proxies(self):
551         try:
552             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
553         except:
554             # Just print exceptions, this is just cleanup
555             self._logger.exception("Loading testbed configuration")
556
557     def _update_execute_xml(self):
558         # For all testbeds,
559         #   For all elements in testbed,
560         #       - gather immutable execute-readable attribuets lists
561         #         asynchronously
562         # Generate new design description from design xml
563         # (Wait for attributes lists - implicit syncpoint)
564         # For all testbeds,
565         #   For all elements in testbed,
566         #       - gather all immutable execute-readable attribute
567         #         values, asynchronously
568         # (Wait for attribute values - implicit syncpoint)
569         # For all testbeds,
570         #   For all elements in testbed,
571         #       - inject non-None values into new design
572         # Generate execute xml from new design
573
574         attribute_lists = dict(
575             (testbed_guid, collections.defaultdict(dict))
576             for testbed_guid in self._testbeds
577         )
578         
579         for testbed_guid, testbed in self._testbeds.iteritems():
580             guids = self._guids_in_testbed(testbed_guid)
581             for guid in guids:
582                 attribute_lists[testbed_guid][guid] = \
583                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
584         
585         parser = XmlExperimentParser()
586         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
587
588         attribute_values = dict(
589             (testbed_guid, collections.defaultdict(dict))
590             for testbed_guid in self._testbeds
591         )
592         
593         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
594             testbed = self._testbeds[testbed_guid]
595             for guid, attribute_list in testbed_attribute_lists.iteritems():
596                 attribute_list = _undefer(attribute_list)
597                 attribute_values[testbed_guid][guid] = dict(
598                     (attribute, testbed.get_deferred(guid, attribute))
599                     for attribute in attribute_list
600                 )
601         
602         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
603             for guid, attribute_values in testbed_attribute_values.iteritems():
604                 for attribute, value in attribute_values.iteritems():
605                     value = _undefer(value)
606                     if value is not None:
607                         execute_data.add_attribute_data(guid, attribute, value)
608         
609         self._experiment_execute_xml = parser.to_xml(data=execute_data)
610
611     def stop(self):
612        for testbed in self._testbeds.values():
613            testbed.stop()
614        self._unpersist_testbed_proxies()
615        self._stopped_time = time.time() 
616    
617     def recover(self):
618         # reload perviously persisted testbed access configurations
619         self._failed_testbeds.clear()
620         self._load_testbed_proxies()
621
622         # re-program testbeds that need recovery
623         self._start(recover = True)
624
625     def is_finished(self, guid):
626         testbed = self._testbed_for_guid(guid)
627         if testbed != None:
628             return testbed.status(guid) == AS.STATUS_FINISHED
629         raise RuntimeError("No element exists with guid %d" % guid)    
630     
631     def _testbed_recovery_policy(self, guid, data = None):
632         if data is None:
633             parser = XmlExperimentParser()
634             data = parser.from_xml_to_data(self._experiment_design_xml)
635         
636         return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
637
638     def status(self, guid):
639         if guid in self._testbeds:
640             # guid is a testbed
641             # report testbed status
642             if guid in self._failed_testbeds:
643                 return TS.STATUS_FAILED
644             else:
645                 try:
646                     return self._testbeds[guid].status()
647                 except:
648                     return TS.STATUS_UNRESPONSIVE
649         else:
650             # guid is an element
651             testbed = self._testbed_for_guid(guid)
652             if testbed is not None:
653                 return testbed.status(guid)
654             else:
655                 return AS.STATUS_UNDETERMINED
656
657     def set(self, guid, name, value, time = TIME_NOW):
658         testbed = self._testbed_for_guid(guid)
659         if testbed != None:
660             testbed.set(guid, name, value, time)
661         else:
662             raise RuntimeError("No element exists with guid %d" % guid)    
663
664     def get(self, guid, name, time = TIME_NOW):
665         testbed = self._testbed_for_guid(guid)
666         if testbed != None:
667             return testbed.get(guid, name, time)
668         raise RuntimeError("No element exists with guid %d" % guid)    
669
670     def get_deferred(self, guid, name, time = TIME_NOW):
671         testbed = self._testbed_for_guid(guid)
672         if testbed != None:
673             return testbed.get_deferred(guid, name, time)
674         raise RuntimeError("No element exists with guid %d" % guid)    
675
676     def get_factory_id(self, guid):
677         testbed = self._testbed_for_guid(guid)
678         if testbed != None:
679             return testbed.get_factory_id(guid)
680         raise RuntimeError("No element exists with guid %d" % guid)    
681
682     def get_testbed_id(self, guid):
683         testbed = self._testbed_for_guid(guid)
684         if testbed != None:
685             return testbed.testbed_id
686         raise RuntimeError("No element exists with guid %d" % guid)    
687
688     def get_testbed_version(self, guid):
689         testbed = self._testbed_for_guid(guid)
690         if testbed != None:
691             return testbed.testbed_version
692         raise RuntimeError("No element exists with guid %d" % guid)    
693
694     def shutdown(self):
695         exceptions = list()
696         for testbed in self._testbeds.values():
697             try:
698                 testbed.shutdown()
699             except:
700                 exceptions.append(sys.exc_info())
701         for exc_info in exceptions:
702             raise exc_info[0], exc_info[1], exc_info[2]
703
704     def _testbed_for_guid(self, guid):
705         for testbed_guid in self._testbeds.keys():
706             if guid in self._guids_in_testbed(testbed_guid):
707                 if testbed_guid in self._failed_testbeds:
708                     return None
709                 return self._testbeds[testbed_guid]
710         return None
711
712     def _guids_in_testbed(self, testbed_guid):
713         if testbed_guid not in self._testbeds:
714             return set()
715         if testbed_guid not in self._guids_in_testbed_cache:
716             self._guids_in_testbed_cache[testbed_guid] = \
717                 set(self._testbeds[testbed_guid].guids)
718         return self._guids_in_testbed_cache[testbed_guid]
719
720     @staticmethod
721     def _netref_component_split(component):
722         match = COMPONENT_PATTERN.match(component)
723         if match:
724             return match.group("kind"), match.group("index")
725         else:
726             return component, None
727
728     _NETREF_COMPONENT_GETTERS = {
729         'addr':
730             lambda testbed, guid, index, name: 
731                 testbed.get_address(guid, int(index), name),
732         'route' :
733             lambda testbed, guid, index, name: 
734                 testbed.get_route(guid, int(index), name),
735         'trace' :
736             lambda testbed, guid, index, name: 
737                 testbed.trace(guid, index, attribute = name),
738         '' : 
739             lambda testbed, guid, index, name: 
740                 testbed.get(guid, name),
741     }
742     
743     def resolve_netref_value(self, value, failval = None):
744         rv = failval
745         while True:
746             for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
747                 label = match.group("label")
748                 if label.startswith('GUID-'):
749                     ref_guid = int(label[5:])
750                     if ref_guid:
751                         expr = match.group("expr")
752                         component = (match.group("component") or "")[1:] # skip the dot
753                         attribute = match.group("attribute")
754                         
755                         # split compound components into component kind and index
756                         # eg: 'addr[0]' -> ('addr', '0')
757                         component, component_index = self._netref_component_split(component)
758
759                         # find object and resolve expression
760                         for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
761                             if component not in self._NETREF_COMPONENT_GETTERS:
762                                 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
763                             elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
764                                 pass
765                             else:
766                                 ref_value = self._NETREF_COMPONENT_GETTERS[component](
767                                     ref_testbed, ref_guid, component_index, attribute)
768                                 if ref_value:
769                                     value = rv = value.replace(match.group(), ref_value)
770                                     break
771                         else:
772                             # unresolvable netref
773                             return failval
774                         break
775             else:
776                 break
777         return rv
778     
779     def do_netrefs(self, data, fail_if_undefined = False):
780         # element netrefs
781         for (testbed_guid, guid), attrs in self._netrefs.items():
782             testbed = self._testbeds.get(testbed_guid)
783             if testbed is not None:
784                 for name in set(attrs):
785                     value = testbed.get(guid, name)
786                     if isinstance(value, basestring):
787                         ref_value = self.resolve_netref_value(value)
788                         if ref_value is not None:
789                             testbed.set(guid, name, ref_value)
790                             attrs.remove(name)
791                         elif fail_if_undefined:
792                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
793                 if not attrs:
794                     del self._netrefs[(testbed_guid, guid)]
795         
796         # testbed netrefs
797         for testbed_guid, attrs in self._testbed_netrefs.items():
798             tb_data = dict(data.get_attribute_data(testbed_guid))
799             if data:
800                 for name in set(attrs):
801                     value = tb_data.get(name)
802                     if isinstance(value, basestring):
803                         ref_value = self.resolve_netref_value(value)
804                         if ref_value is not None:
805                             data.set_attribute_data(testbed_guid, name, ref_value)
806                             attrs.remove(name)
807                         elif fail_if_undefined:
808                             raise ValueError, "Unresolvable netref in: %r" % (value,)
809                 if not attrs:
810                     del self._testbed_netrefs[testbed_guid]
811         
812
813     def _init_testbed_controllers(self, data, recover = False):
814         blacklist_testbeds = set(self._testbeds)
815         element_guids = list()
816         label_guids = dict()
817         data_guids = data.guids
818         to_recover = set()
819         to_restart = set()
820
821         # gather label associations
822         for guid in data_guids:
823             if not data.is_testbed_data(guid):
824                 (testbed_guid, factory_id) = data.get_box_data(guid)
825                 label = data.get_attribute_data(guid, "label")
826                 if label is not None:
827                     if label in label_guids:
828                         raise RuntimeError, "Label %r is not unique" % (label,)
829                     label_guids[label] = guid
830
831         # create testbed controllers
832         for guid in data_guids:
833             if data.is_testbed_data(guid):
834                 if guid not in self._testbeds:
835                     try:
836                         self._create_testbed_controller(
837                             guid, data, element_guids, recover)
838                         if recover:
839                             # Already programmed
840                             blacklist_testbeds.add(guid)
841                         else:
842                             to_restart.add(guid)
843                     except:
844                         if recover:
845                             policy = self._testbed_recovery_policy(guid, data=data)
846                             if policy == DC.POLICY_RECOVER:
847                                 self._create_testbed_controller(
848                                     guid, data, element_guids, False)
849                                 to_recover.add(guid)
850                             elif policy == DC.POLICY_RESTART:
851                                 self._create_testbed_controller(
852                                     guid, data, element_guids, False)
853                                 to_restart.add(guid)
854                             else:
855                                 # Mark failed
856                                 self._failed_testbeds.add(guid)
857                         else:
858                             raise
859         
860         # queue programmable elements
861         #  - that have not been programmed already (blacklist_testbeds)
862         #  - including recovered or restarted testbeds
863         #  - but those that have no unresolved netrefs
864         for guid in data_guids:
865             if not data.is_testbed_data(guid):
866                 (testbed_guid, factory_id) = data.get_box_data(guid)
867                 if testbed_guid not in blacklist_testbeds:
868                     element_guids.append(guid)
869
870         # replace references to elements labels for its guid
871         self._resolve_labels(data, data_guids, label_guids)
872     
873         # program testbed controllers
874         if element_guids:
875             self._program_testbed_controllers(element_guids, data)
876         
877         return to_recover, to_restart
878
879     def _resolve_labels(self, data, data_guids, label_guids):
880         netrefs = self._netrefs
881         testbed_netrefs = self._testbed_netrefs
882         for guid in data_guids:
883             for name, value in data.get_attribute_data(guid):
884                 if isinstance(value, basestring):
885                     while True:
886                         for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
887                             label = match.group("label")
888                             if not label.startswith('GUID-'):
889                                 ref_guid = label_guids.get(label)
890                                 if ref_guid is not None:
891                                     value = value.replace(
892                                         match.group(),
893                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
894                                             guid = 'GUID-%d' % (ref_guid,),
895                                             expr = match.group("expr"),
896                                             label = label)
897                                     )
898                                     data.set_attribute_data(guid, name, value)
899                                     
900                                     # memorize which guid-attribute pairs require
901                                     # postprocessing, to avoid excessive controller-testbed
902                                     # communication at configuration time
903                                     # (which could require high-latency network I/O)
904                                     if not data.is_testbed_data(guid):
905                                         (testbed_guid, factory_id) = data.get_box_data(guid)
906                                         netrefs[(testbed_guid, guid)].add(name)
907                                     else:
908                                         testbed_netrefs[guid].add(name)
909                                     
910                                     break
911                         else:
912                             break
913
914     def _create_testbed_controller(self, guid, data, element_guids, recover):
915         (testbed_id, testbed_version) = data.get_testbed_data(guid)
916         deployment_config = self._deployment_config.get(guid)
917         
918         # deferred import because proxy needs
919         # our class definitions to define proxies
920         import nepi.util.proxy as proxy
921         
922         if deployment_config is None:
923             # need to create one
924             deployment_config = proxy.AccessConfiguration()
925             
926             for (name, value) in data.get_attribute_data(guid):
927                 if value is not None and deployment_config.has_attribute(name):
928                     # if any deployment config attribute has a netref, we can't
929                     # create this controller yet
930                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
931                         # remember to re-issue this one
932                         self._netreffed_testbeds.add(guid)
933                         return
934                     
935                     # copy deployment config attribute
936                     deployment_config.set_attribute_value(name, value)
937             
938             # commit config
939             self._deployment_config[guid] = deployment_config
940         
941         if deployment_config is not None:
942             # force recovery mode 
943             deployment_config.set_attribute_value("recover",recover)
944         
945         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
946                 deployment_config)
947         for (name, value) in data.get_attribute_data(guid):
948             testbed.defer_configure(name, value)
949         self._testbeds[guid] = testbed
950         if guid in self._netreffed_testbeds:
951             self._netreffed_testbeds.remove(guid)
952
953     def _program_testbed_controllers(self, element_guids, data):
954         def resolve_create_netref(data, guid, name, value): 
955             # Try to resolve create-time netrefs, if possible
956             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
957                 try:
958                     nuvalue = self.resolve_netref_value(value)
959                 except:
960                     # Any trouble means we're not in shape to resolve the netref yet
961                     nuvalue = None
962                 if nuvalue is not None:
963                     # Only if we succeed we remove the netref deferral entry
964                     value = nuvalue
965                     data.set_attribute_data(guid, name, value)
966                     if (testbed_guid, guid) in self._netrefs:
967                         self._netrefs[(testbed_guid, guid)].discard(name)
968             return value
969
970         for guid in element_guids:
971             (testbed_guid, factory_id) = data.get_box_data(guid)
972             testbed = self._testbeds.get(testbed_guid)
973             if testbed is not None:
974                 # create
975                 testbed.defer_create(guid, factory_id)
976                 # set attributes
977                 for (name, value) in data.get_attribute_data(guid):
978                     value = resolve_create_netref(data, guid, name, value)
979                     testbed.defer_create_set(guid, name, value)
980
981         for guid in element_guids:
982             (testbed_guid, factory_id) = data.get_box_data(guid)
983             testbed = self._testbeds.get(testbed_guid)
984             if testbed is not None:
985                 # traces
986                 for trace_id in data.get_trace_data(guid):
987                     testbed.defer_add_trace(guid, trace_id)
988                 # addresses
989                 for (address, netprefix, broadcast) in data.get_address_data(guid):
990                     if address != None:
991                         testbed.defer_add_address(guid, address, netprefix, 
992                                 broadcast)
993                 # routes
994                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
995                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
996                 # store connections data
997                 for (connector_type_name, other_guid, other_connector_type_name) \
998                         in data.get_connection_data(guid):
999                     (other_testbed_guid, other_factory_id) = data.get_box_data(
1000                             other_guid)
1001                     if testbed_guid == other_testbed_guid:
1002                         # each testbed should take care of enforcing internal
1003                         # connection simmetry, so each connection is only
1004                         # added in one direction
1005                         testbed.defer_connect(guid, connector_type_name, 
1006                                 other_guid, other_connector_type_name)
1007
1008     def _program_testbed_cross_connections(self, data):
1009         data_guids = data.guids
1010         for guid in data_guids: 
1011             if not data.is_testbed_data(guid):
1012                 (testbed_guid, factory_id) = data.get_box_data(guid)
1013                 testbed = self._testbeds.get(testbed_guid)
1014                 if testbed is not None:
1015                     for (connector_type_name, cross_guid, cross_connector_type_name) \
1016                             in data.get_connection_data(guid):
1017                         (testbed_guid, factory_id) = data.get_box_data(guid)
1018                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
1019                                 cross_guid)
1020                         if testbed_guid != cross_testbed_guid:
1021                             cross_testbed = self._testbeds[cross_testbed_guid]
1022                             cross_testbed_id = cross_testbed.testbed_id
1023                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
1024                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
1025                                     cross_connector_type_name)
1026                             # save cross data for later
1027                             self._logger.debug("ExperimentController: adding cross_connection data tbd=%d:guid=%d - tbd=%d:guid=%d" % \
1028                                     (testbed_guid, guid, cross_testbed_guid, cross_guid))
1029                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1030                                     cross_guid)
1031
1032     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1033         if testbed_guid not in self._cross_data:
1034             self._cross_data[testbed_guid] = dict()
1035         if cross_testbed_guid not in self._cross_data[testbed_guid]:
1036             self._cross_data[testbed_guid][cross_testbed_guid] = set()
1037         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1038
1039     def _get_cross_data(self, testbed_guid):
1040         cross_data = dict()
1041         if not testbed_guid in self._cross_data:
1042             return cross_data
1043
1044         # fetch attribute lists in one batch
1045         attribute_lists = dict()
1046         for cross_testbed_guid, guid_list in \
1047                 self._cross_data[testbed_guid].iteritems():
1048             cross_testbed = self._testbeds[cross_testbed_guid]
1049             for cross_guid in guid_list:
1050                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1051                     cross_testbed.get_attribute_list_deferred(cross_guid)
1052
1053         # fetch attribute values in another batch
1054         for cross_testbed_guid, guid_list in \
1055                 self._cross_data[testbed_guid].iteritems():
1056             cross_data[cross_testbed_guid] = dict()
1057             cross_testbed = self._testbeds[cross_testbed_guid]
1058             for cross_guid in guid_list:
1059                 elem_cross_data = dict(
1060                     _guid = cross_guid,
1061                     _testbed_guid = cross_testbed_guid,
1062                     _testbed_id = cross_testbed.testbed_id,
1063                     _testbed_version = cross_testbed.testbed_version)
1064                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1065                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1066                 for attr_name in attribute_list:
1067                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1068                     elem_cross_data[attr_name] = attr_value
1069         
1070         # undefer all values - we'll have to serialize them probably later
1071         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1072             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1073                 for attr_name, attr_value in elem_cross_data.iteritems():
1074                     elem_cross_data[attr_name] = _undefer(attr_value)
1075         
1076         return cross_data
1077     """
1078 class ExperimentSuite(object):
1079     def __init__(self, experiment_xml, access_config, repetitions,
1080             duration, wait_guids):
1081         self._experiment_xml = experiment_xml
1082         self._access_config = access_config
1083         self._experiments = dict()
1084         self._repetitions = repetitions
1085         self._duration = duration
1086         self._wait_guids = wait_guids
1087         self._current = None
1088         self._status = TS.STATUS_ZERO
1089         self._thread = None
1090
1091     def start(self):
1092         self._status  = TS.STATUS_STARTED
1093         self._thread = threading.Thread(target = self._run_experiment_suite)
1094         self._thread.start()
1095
1096     def shutdown(self):
1097         if self._thread:
1098             self._thread.join()
1099             self._thread = None
1100
1101     def _run_experiment_suite(self):
1102         for i in xrange[0, self.repetitions]:
1103             self._current = i
1104             self._run_one_experiment()
1105
1106     def _run_one_experiment(self):
1107         access_config = proxy.AccessConfiguration()
1108         for attr in self._access_config.attributes:
1109             access_config.set_attribute_value(attr.name, attr.value)
1110         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1111         root_dir = "%s_%d" % (
1112                 access_config.get_attribute_value(DC.ROOT_DIRECTORY), 
1113                 self._current)
1114         access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1115         controller = proxy.create_experiment_controller(self._experiment_xml,
1116                 access_config)
1117         self._experiments[self._current] = controller
1118         controller.start()
1119         started_at = time.time()
1120         # wait until all specified guids have finished execution
1121         if self._wait_guids:
1122             while all(itertools.imap(controller.is_finished, self._wait_guids):
1123                 time.sleep(0.5)
1124         # wait until the minimum experiment duration time has elapsed 
1125         if self._duration:
1126             while (time.time() - started_at) < self._duration:
1127                 time.sleep(0.5)
1128         controller.stop()
1129         #download results!!
1130         controller.shutdown()
1131     """
1132
1133
1134
1135
1136
1137
1138