Daemonized servers are now always launched with popen, and not directly invoked in...
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15 import time
16 import logging
17 logging.basicConfig()
18
19 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
20 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
21 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
22
23 def _undefer(deferred):
24     if hasattr(deferred, '_get'):
25         return deferred._get()
26     else:
27         return deferred
28
29
30 class TestbedController(object):
31     def __init__(self, testbed_id, testbed_version):
32         self._testbed_id = testbed_id
33         self._testbed_version = testbed_version
34
35     @property
36     def testbed_id(self):
37         return self._testbed_id
38
39     @property
40     def testbed_version(self):
41         return self._testbed_version
42
43     @property
44     def guids(self):
45         raise NotImplementedError
46
47     def defer_configure(self, name, value):
48         """Instructs setting a configuartion attribute for the testbed instance"""
49         raise NotImplementedError
50
51     def defer_create(self, guid, factory_id):
52         """Instructs creation of element """
53         raise NotImplementedError
54
55     def defer_create_set(self, guid, name, value):
56         """Instructs setting an initial attribute on an element"""
57         raise NotImplementedError
58
59     def defer_factory_set(self, guid, name, value):
60         """Instructs setting an attribute on a factory"""
61         raise NotImplementedError
62
63     def defer_connect(self, guid1, connector_type_name1, guid2, 
64             connector_type_name2): 
65         """Instructs creation of a connection between the given connectors"""
66         raise NotImplementedError
67
68     def defer_cross_connect(self, 
69             guid, connector_type_name,
70             cross_guid, cross_testbed_guid,
71             cross_testbed_id, cross_factory_id,
72             cross_connector_type_name):
73         """
74         Instructs creation of a connection between the given connectors 
75         of different testbed instances
76         """
77         raise NotImplementedError
78
79     def defer_add_trace(self, guid, trace_id):
80         """Instructs the addition of a trace"""
81         raise NotImplementedError
82
83     def defer_add_address(self, guid, address, netprefix, broadcast): 
84         """Instructs the addition of an address"""
85         raise NotImplementedError
86
87     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
88         """Instructs the addition of a route"""
89         raise NotImplementedError
90
91     def do_setup(self):
92         """After do_setup the testbed initial configuration is done"""
93         raise NotImplementedError
94
95     def do_create(self):
96         """
97         After do_create all instructed elements are created and 
98         attributes setted
99         """
100         raise NotImplementedError
101
102     def do_connect_init(self):
103         """
104         After do_connect_init all internal connections between testbed elements
105         are initiated
106         """
107         raise NotImplementedError
108
109     def do_connect_compl(self):
110         """
111         After do_connect all internal connections between testbed elements
112         are completed
113         """
114         raise NotImplementedError
115
116     def do_preconfigure(self):
117         """
118         Done just before resolving netrefs, after connection, before cross connections,
119         useful for early stages of configuration, for setting up stuff that might be
120         required for netref resolution.
121         """
122         raise NotImplementedError
123
124     def do_configure(self):
125         """After do_configure elements are configured"""
126         raise NotImplementedError
127
128     def do_prestart(self):
129         """Before do_start elements are prestart-configured"""
130         raise NotImplementedError
131
132     def do_cross_connect_init(self, cross_data):
133         """
134         After do_cross_connect_init initiation of all external connections 
135         between different testbed elements is performed
136         """
137         raise NotImplementedError
138
139     def do_cross_connect_compl(self, cross_data):
140         """
141         After do_cross_connect_compl completion of all external connections 
142         between different testbed elements is performed
143         """
144         raise NotImplementedError
145
146     def start(self):
147         raise NotImplementedError
148
149     def stop(self):
150         raise NotImplementedError
151
152     def recover(self):
153         """
154         On testbed recovery (if recovery is a supported policy), the controller
155         instance will be re-created and the following sequence invoked:
156         
157             do_setup
158             defer_X - programming the testbed with persisted execution values
159                 (not design values). Execution values (ExecImmutable attributes)
160                 should be enough to recreate the testbed's state.
161             *recover*
162             <cross-connection methods>
163             
164         Start will not be called, and after cross connection invocations,
165         the testbed is supposed to be fully functional again.
166         """
167         raise NotImplementedError
168
169     def set(self, guid, name, value, time = TIME_NOW):
170         raise NotImplementedError
171
172     def get(self, guid, name, time = TIME_NOW):
173         raise NotImplementedError
174     
175     def get_route(self, guid, index, attribute):
176         """
177         Params:
178             
179             guid: guid of box to query
180             index: number of routing entry to fetch
181             attribute: one of Destination, NextHop, NetPrefix
182         """
183         raise NotImplementedError
184
185     def get_address(self, guid, index, attribute='Address'):
186         """
187         Params:
188             
189             guid: guid of box to query
190             index: number of inteface to select
191             attribute: one of Address, NetPrefix, Broadcast
192         """
193         raise NotImplementedError
194
195     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
196         raise NotImplementedError
197
198     def get_factory_id(self, guid):
199         raise NotImplementedError
200
201     def action(self, time, guid, action):
202         raise NotImplementedError
203
204     def status(self, guid):
205         raise NotImplementedError
206     
207     def testbed_status(self):
208         raise NotImplementedError
209
210     def trace(self, guid, trace_id, attribute='value'):
211         raise NotImplementedError
212
213     def traces_info(self):
214         """ dictionary of dictionaries:
215             traces_info = dict({
216                 guid = dict({
217                     trace_id = dict({
218                             host = host,
219                             filepath = filepath,
220                             filesize = size in bytes,
221                         })
222                 })
223             })"""
224         raise NotImplementedError
225
226     def shutdown(self):
227         raise NotImplementedError
228
229 class ExperimentController(object):
230     def __init__(self, experiment_xml, root_dir):
231         self._experiment_design_xml = experiment_xml
232         self._experiment_execute_xml = None
233         self._testbeds = dict()
234         self._deployment_config = dict()
235         self._netrefs = collections.defaultdict(set)
236         self._testbed_netrefs = collections.defaultdict(set)
237         self._cross_data = dict()
238         self._root_dir = root_dir
239         self._netreffed_testbeds = set()
240         self._guids_in_testbed_cache = dict()
241         self._failed_testbeds = set()
242         self._started_time = None
243         self._stopped_time = None
244         
245         self._logger = logging.getLogger('nepi.core.execute')
246         
247         if experiment_xml is None and root_dir is not None:
248             # Recover
249             self.load_experiment_xml()
250             self.load_execute_xml()
251         else:
252             self.persist_experiment_xml()
253
254     @property
255     def experiment_design_xml(self):
256         return self._experiment_design_xml
257
258     @property
259     def experiment_execute_xml(self):
260         return self._experiment_execute_xml
261
262     @property
263     def started_time(self):
264         return self._started_time
265
266     @property
267     def stopped_time(self):
268         return self._stopped_time
269
270     @property
271     def guids(self):
272         guids = list()
273         for testbed_guid in self._testbeds.keys():
274             _guids = self._guids_in_testbed(testbed_guid)
275             if _guids:
276                 guids.extend(_guids)
277         return guids
278
279     def persist_experiment_xml(self):
280         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
281         f = open(xml_path, "w")
282         f.write(self._experiment_design_xml)
283         f.close()
284
285     def persist_execute_xml(self):
286         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
287         f = open(xml_path, "w")
288         f.write(self._experiment_execute_xml)
289         f.close()
290
291     def load_experiment_xml(self):
292         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
293         f = open(xml_path, "r")
294         self._experiment_design_xml = f.read()
295         f.close()
296
297     def load_execute_xml(self):
298         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
299         f = open(xml_path, "r")
300         self._experiment_execute_xml = f.read()
301         f.close()
302
303     def trace(self, guid, trace_id, attribute='value'):
304         testbed = self._testbed_for_guid(guid)
305         if testbed != None:
306             return testbed.trace(guid, trace_id, attribute)
307         raise RuntimeError("No element exists with guid %d" % guid)    
308
309     def traces_info(self):
310         traces_info = dict()
311         for guid, testbed in self._testbeds.iteritems():
312             tinfo = testbed.traces_info()
313             if tinfo:
314                 traces_info[guid] = testbed.traces_info()
315         return traces_info
316
317     @staticmethod
318     def _parallel(callables):
319         excs = []
320         def wrap(callable):
321             @functools.wraps(callable)
322             def wrapped(*p, **kw):
323                 try:
324                     callable(*p, **kw)
325                 except:
326                     logging.exception("Exception occurred in asynchronous thread:")
327                     excs.append(sys.exc_info())
328             return wrapped
329         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
330         for thread in threads:
331             thread.start()
332         for thread in threads:
333             thread.join()
334         for exc in excs:
335             eTyp, eVal, eLoc = exc
336             raise eTyp, eVal, eLoc
337
338     def start(self):
339         self._started_time = time.time() 
340         self._start()
341
342     def _start(self, recover = False):
343         parser = XmlExperimentParser()
344         
345         if recover:
346             xml = self._experiment_execute_xml
347         else:
348             xml = self._experiment_design_xml
349         data = parser.from_xml_to_data(xml)
350
351         # instantiate testbed controllers
352         to_recover, to_restart = self._init_testbed_controllers(data, recover)
353         all_restart = set(to_restart)
354         
355         if not recover:
356             # persist testbed connection data, for potential recovery
357             self._persist_testbed_proxies()
358         else:
359             # recover recoverable controllers
360             for guid in to_recover:
361                 try:
362                     self._testbeds[guid].do_setup()
363                     self._testbeds[guid].recover()
364                 except:
365                     self._logger.exception("During recovery of testbed %s", guid)
366                     
367                     # Mark failed
368                     self._failed_testbeds.add(guid)
369     
370         def steps_to_configure(self, allowed_guids):
371             # perform setup in parallel for all test beds,
372             # wait for all threads to finish
373             self._parallel([testbed.do_setup 
374                             for guid,testbed in self._testbeds.iteritems()
375                             if guid in allowed_guids])
376        
377             # perform create-connect in parallel, wait
378             # (internal connections only)
379             self._parallel([testbed.do_create
380                             for guid,testbed in self._testbeds.iteritems()
381                             if guid in allowed_guids])
382
383             self._parallel([testbed.do_connect_init
384                             for guid,testbed in self._testbeds.iteritems()
385                             if guid in allowed_guids])
386
387             self._parallel([testbed.do_connect_compl
388                             for guid,testbed in self._testbeds.iteritems()
389                             if guid in allowed_guids])
390
391             self._parallel([testbed.do_preconfigure
392                             for guid,testbed in self._testbeds.iteritems()
393                             if guid in allowed_guids])
394             self._clear_caches()
395
396         steps_to_configure(self, to_restart)
397
398         if self._netreffed_testbeds:
399             # initally resolve netrefs
400             self.do_netrefs(data, fail_if_undefined=False)
401             
402             # rinse and repeat, for netreffed testbeds
403             netreffed_testbeds = set(self._netreffed_testbeds)
404
405             to_recover, to_restart = self._init_testbed_controllers(data, recover)
406             all_restart.update(to_restart)
407             
408             if not recover:
409                 # persist testbed connection data, for potential recovery
410                 self._persist_testbed_proxies()
411             else:
412                 # recover recoverable controllers
413                 for guid in to_recover:
414                     try:
415                         self._testbeds[guid].do_setup()
416                         self._testbeds[guid].recover()
417                     except:
418                         self._logger.exception("During recovery of testbed %s", guid)
419
420                         # Mark failed
421                         self._failed_testbeds.add(guid)
422
423             # configure dependant testbeds
424             steps_to_configure(self, to_restart)
425         
426         all_restart = [ self._testbeds[guid] for guid in all_restart ]
427             
428         # final netref step, fail if anything's left unresolved
429         self.do_netrefs(data, fail_if_undefined=False)
430        
431         # Only now, that netref dependencies have been solve, it is safe to
432         # program cross_connections
433         self._program_testbed_cross_connections(data)
434  
435         # perform do_configure in parallel for al testbeds
436         # (it's internal configuration for each)
437         self._parallel([testbed.do_configure
438                         for testbed in all_restart])
439
440         self._clear_caches()
441
442         #print >>sys.stderr, "DO IT"
443         #import time
444         #time.sleep(60)
445         
446         # cross-connect (cannot be done in parallel)
447         for guid, testbed in self._testbeds.iteritems():
448             cross_data = self._get_cross_data(guid)
449             testbed.do_cross_connect_init(cross_data)
450         for guid, testbed in self._testbeds.iteritems():
451             cross_data = self._get_cross_data(guid)
452             testbed.do_cross_connect_compl(cross_data)
453        
454         self._clear_caches()
455
456         # Last chance to configure (parallel on all testbeds)
457         self._parallel([testbed.do_prestart
458                         for testbed in all_restart])
459
460         # final netref step, fail if anything's left unresolved
461         self.do_netrefs(data, fail_if_undefined=True)
462  
463         self._clear_caches()
464         
465         if not recover:
466             # update execution xml with execution-specific values
467             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
468             self._update_execute_xml()
469             self.persist_execute_xml()
470
471         # start experiment (parallel start on all testbeds)
472         self._parallel([testbed.start
473                         for testbed in all_restart])
474
475         self._clear_caches()
476
477     def _clear_caches(self):
478         # Cleaning cache for safety.
479         self._guids_in_testbed_cache = dict()
480
481     def _persist_testbed_proxies(self):
482         TRANSIENT = (DC.RECOVER,)
483         
484         # persist access configuration for all testbeds, so that
485         # recovery mode can reconnect to them if it becomes necessary
486         conf = ConfigParser.RawConfigParser()
487         for testbed_guid, testbed_config in self._deployment_config.iteritems():
488             testbed_guid = str(testbed_guid)
489             conf.add_section(testbed_guid)
490             for attr in testbed_config.get_attribute_list():
491                 if attr not in TRANSIENT:
492                     conf.set(testbed_guid, attr, 
493                         testbed_config.get_attribute_value(attr))
494         
495         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
496         conf.write(f)
497         f.close()
498     
499     def _load_testbed_proxies(self):
500         TYPEMAP = {
501             Attribute.STRING : 'get',
502             Attribute.BOOL : 'getboolean',
503             Attribute.ENUM : 'get',
504             Attribute.DOUBLE : 'getfloat',
505             Attribute.INTEGER : 'getint',
506         }
507         
508         TRANSIENT = (DC.RECOVER,)
509         
510         # deferred import because proxy needs
511         # our class definitions to define proxies
512         import nepi.util.proxy as proxy
513         
514         conf = ConfigParser.RawConfigParser()
515         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
516         for testbed_guid in conf.sections():
517             testbed_config = proxy.AccessConfiguration()
518             testbed_guid = str(testbed_guid)
519             for attr in testbed_config.get_attribute_list():
520                 if attr not in TRANSIENT:
521                     getter = getattr(conf, TYPEMAP.get(
522                         testbed_config.get_attribute_type(attr),
523                         'get') )
524                     testbed_config.set_attribute_value(
525                         attr, getter(testbed_guid, attr))
526     
527     def _unpersist_testbed_proxies(self):
528         try:
529             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
530         except:
531             # Just print exceptions, this is just cleanup
532             self._logger.exception("Loading testbed configuration")
533
534     def _update_execute_xml(self):
535         # For all testbeds,
536         #   For all elements in testbed,
537         #       - gather immutable execute-readable attribuets lists
538         #         asynchronously
539         # Generate new design description from design xml
540         # (Wait for attributes lists - implicit syncpoint)
541         # For all testbeds,
542         #   For all elements in testbed,
543         #       - gather all immutable execute-readable attribute
544         #         values, asynchronously
545         # (Wait for attribute values - implicit syncpoint)
546         # For all testbeds,
547         #   For all elements in testbed,
548         #       - inject non-None values into new design
549         # Generate execute xml from new design
550
551         attribute_lists = dict(
552             (testbed_guid, collections.defaultdict(dict))
553             for testbed_guid in self._testbeds
554         )
555         
556         for testbed_guid, testbed in self._testbeds.iteritems():
557             guids = self._guids_in_testbed(testbed_guid)
558             for guid in guids:
559                 attribute_lists[testbed_guid][guid] = \
560                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
561         
562         parser = XmlExperimentParser()
563         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
564
565         attribute_values = dict(
566             (testbed_guid, collections.defaultdict(dict))
567             for testbed_guid in self._testbeds
568         )
569         
570         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
571             testbed = self._testbeds[testbed_guid]
572             for guid, attribute_list in testbed_attribute_lists.iteritems():
573                 attribute_list = _undefer(attribute_list)
574                 attribute_values[testbed_guid][guid] = dict(
575                     (attribute, testbed.get_deferred(guid, attribute))
576                     for attribute in attribute_list
577                 )
578         
579         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
580             for guid, attribute_values in testbed_attribute_values.iteritems():
581                 for attribute, value in attribute_values.iteritems():
582                     value = _undefer(value)
583                     if value is not None:
584                         execute_data.add_attribute_data(guid, attribute, value)
585         
586         self._experiment_execute_xml = parser.to_xml(data=execute_data)
587
588     def stop(self):
589        for testbed in self._testbeds.values():
590            testbed.stop()
591        self._unpersist_testbed_proxies()
592        self._stopped_time = time.time() 
593    
594     def recover(self):
595         # reload perviously persisted testbed access configurations
596         self._failed_testbeds.clear()
597         self._load_testbed_proxies()
598
599         # re-program testbeds that need recovery
600         self._start(recover = True)
601
602     def is_finished(self, guid):
603         testbed = self._testbed_for_guid(guid)
604         if testbed != None:
605             return testbed.status(guid) == AS.STATUS_FINISHED
606         raise RuntimeError("No element exists with guid %d" % guid)    
607     
608     def _testbed_recovery_policy(self, guid, data = None):
609         if data is None:
610             parser = XmlExperimentParser()
611             data = parser.from_xml_to_data(self._experiment_design_xml)
612         
613         return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
614
615     def status(self, guid):
616         if guid in self._testbeds:
617             # guid is a testbed
618             # report testbed status
619             if guid in self._failed_testbeds:
620                 return TS.STATUS_FAILED
621             else:
622                 try:
623                     return self._testbeds[guid].status()
624                 except:
625                     return TS.STATUS_UNRESPONSIVE
626         else:
627             # guid is an element
628             testbed = self._testbed_for_guid(guid)
629             if testbed is not None:
630                 return testbed.status(guid)
631             else:
632                 return AS.STATUS_UNDETERMINED
633
634     def set(self, guid, name, value, time = TIME_NOW):
635         testbed = self._testbed_for_guid(guid)
636         if testbed != None:
637             testbed.set(guid, name, value, time)
638         else:
639             raise RuntimeError("No element exists with guid %d" % guid)    
640
641     def get(self, guid, name, time = TIME_NOW):
642         testbed = self._testbed_for_guid(guid)
643         if testbed != None:
644             return testbed.get(guid, name, time)
645         raise RuntimeError("No element exists with guid %d" % guid)    
646
647     def get_deferred(self, guid, name, time = TIME_NOW):
648         testbed = self._testbed_for_guid(guid)
649         if testbed != None:
650             return testbed.get_deferred(guid, name, time)
651         raise RuntimeError("No element exists with guid %d" % guid)    
652
653     def get_factory_id(self, guid):
654         testbed = self._testbed_for_guid(guid)
655         if testbed != None:
656             return testbed.get_factory_id(guid)
657         raise RuntimeError("No element exists with guid %d" % guid)    
658
659     def get_testbed_id(self, guid):
660         testbed = self._testbed_for_guid(guid)
661         if testbed != None:
662             return testbed.testbed_id
663         raise RuntimeError("No element exists with guid %d" % guid)    
664
665     def get_testbed_version(self, guid):
666         testbed = self._testbed_for_guid(guid)
667         if testbed != None:
668             return testbed.testbed_version
669         raise RuntimeError("No element exists with guid %d" % guid)    
670
671     def shutdown(self):
672         exceptions = list()
673         for testbed in self._testbeds.values():
674             try:
675                 testbed.shutdown()
676             except:
677                 exceptions.append(sys.exc_info())
678         for exc_info in exceptions:
679             raise exc_info[0], exc_info[1], exc_info[2]
680
681     def _testbed_for_guid(self, guid):
682         for testbed_guid in self._testbeds.keys():
683             if guid in self._guids_in_testbed(testbed_guid):
684                 if testbed_guid in self._failed_testbeds:
685                     return None
686                 return self._testbeds[testbed_guid]
687         return None
688
689     def _guids_in_testbed(self, testbed_guid):
690         if testbed_guid not in self._testbeds:
691             return set()
692         if testbed_guid not in self._guids_in_testbed_cache:
693             self._guids_in_testbed_cache[testbed_guid] = \
694                 set(self._testbeds[testbed_guid].guids)
695         return self._guids_in_testbed_cache[testbed_guid]
696
697     @staticmethod
698     def _netref_component_split(component):
699         match = COMPONENT_PATTERN.match(component)
700         if match:
701             return match.group("kind"), match.group("index")
702         else:
703             return component, None
704
705     _NETREF_COMPONENT_GETTERS = {
706         'addr':
707             lambda testbed, guid, index, name: 
708                 testbed.get_address(guid, int(index), name),
709         'route' :
710             lambda testbed, guid, index, name: 
711                 testbed.get_route(guid, int(index), name),
712         'trace' :
713             lambda testbed, guid, index, name: 
714                 testbed.trace(guid, index, attribute = name),
715         '' : 
716             lambda testbed, guid, index, name: 
717                 testbed.get(guid, name),
718     }
719     
720     def resolve_netref_value(self, value, failval = None):
721         rv = failval
722         while True:
723             for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
724                 label = match.group("label")
725                 if label.startswith('GUID-'):
726                     ref_guid = int(label[5:])
727                     if ref_guid:
728                         expr = match.group("expr")
729                         component = (match.group("component") or "")[1:] # skip the dot
730                         attribute = match.group("attribute")
731                         
732                         # split compound components into component kind and index
733                         # eg: 'addr[0]' -> ('addr', '0')
734                         component, component_index = self._netref_component_split(component)
735
736                         # find object and resolve expression
737                         for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
738                             if component not in self._NETREF_COMPONENT_GETTERS:
739                                 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
740                             elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
741                                 pass
742                             else:
743                                 ref_value = self._NETREF_COMPONENT_GETTERS[component](
744                                     ref_testbed, ref_guid, component_index, attribute)
745                                 if ref_value:
746                                     value = rv = value.replace(match.group(), ref_value)
747                                     break
748                         else:
749                             # unresolvable netref
750                             return failval
751                         break
752             else:
753                 break
754         return rv
755     
756     def do_netrefs(self, data, fail_if_undefined = False):
757         # element netrefs
758         for (testbed_guid, guid), attrs in self._netrefs.items():
759             testbed = self._testbeds.get(testbed_guid)
760             if testbed is not None:
761                 for name in set(attrs):
762                     value = testbed.get(guid, name)
763                     if isinstance(value, basestring):
764                         ref_value = self.resolve_netref_value(value)
765                         if ref_value is not None:
766                             testbed.set(guid, name, ref_value)
767                             attrs.remove(name)
768                         elif fail_if_undefined:
769                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
770                 if not attrs:
771                     del self._netrefs[(testbed_guid, guid)]
772         
773         # testbed netrefs
774         for testbed_guid, attrs in self._testbed_netrefs.items():
775             tb_data = dict(data.get_attribute_data(testbed_guid))
776             if data:
777                 for name in set(attrs):
778                     value = tb_data.get(name)
779                     if isinstance(value, basestring):
780                         ref_value = self.resolve_netref_value(value)
781                         if ref_value is not None:
782                             data.set_attribute_data(testbed_guid, name, ref_value)
783                             attrs.remove(name)
784                         elif fail_if_undefined:
785                             raise ValueError, "Unresolvable netref in: %r" % (value,)
786                 if not attrs:
787                     del self._testbed_netrefs[testbed_guid]
788         
789
790     def _init_testbed_controllers(self, data, recover = False):
791         blacklist_testbeds = set(self._testbeds)
792         element_guids = list()
793         label_guids = dict()
794         data_guids = data.guids
795         to_recover = set()
796         to_restart = set()
797
798         # gather label associations
799         for guid in data_guids:
800             if not data.is_testbed_data(guid):
801                 (testbed_guid, factory_id) = data.get_box_data(guid)
802                 label = data.get_attribute_data(guid, "label")
803                 if label is not None:
804                     if label in label_guids:
805                         raise RuntimeError, "Label %r is not unique" % (label,)
806                     label_guids[label] = guid
807
808         # create testbed controllers
809         for guid in data_guids:
810             if data.is_testbed_data(guid):
811                 if guid not in self._testbeds:
812                     try:
813                         self._create_testbed_controller(
814                             guid, data, element_guids, recover)
815                         if recover:
816                             # Already programmed
817                             blacklist_testbeds.add(guid)
818                         else:
819                             to_restart.add(guid)
820                     except:
821                         if recover:
822                             policy = self._testbed_recovery_policy(guid, data=data)
823                             if policy == DC.POLICY_RECOVER:
824                                 self._create_testbed_controller(
825                                     guid, data, element_guids, False)
826                                 to_recover.add(guid)
827                             elif policy == DC.POLICY_RESTART:
828                                 self._create_testbed_controller(
829                                     guid, data, element_guids, False)
830                                 to_restart.add(guid)
831                             else:
832                                 # Mark failed
833                                 self._failed_testbeds.add(guid)
834                         else:
835                             raise
836         
837         # queue programmable elements
838         #  - that have not been programmed already (blacklist_testbeds)
839         #  - including recovered or restarted testbeds
840         #  - but those that have no unresolved netrefs
841         for guid in data_guids:
842             if not data.is_testbed_data(guid):
843                 (testbed_guid, factory_id) = data.get_box_data(guid)
844                 if testbed_guid not in blacklist_testbeds:
845                     element_guids.append(guid)
846
847         # replace references to elements labels for its guid
848         self._resolve_labels(data, data_guids, label_guids)
849     
850         # program testbed controllers
851         if element_guids:
852             self._program_testbed_controllers(element_guids, data)
853         
854         return to_recover, to_restart
855
856     def _resolve_labels(self, data, data_guids, label_guids):
857         netrefs = self._netrefs
858         testbed_netrefs = self._testbed_netrefs
859         for guid in data_guids:
860             for name, value in data.get_attribute_data(guid):
861                 if isinstance(value, basestring):
862                     while True:
863                         for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
864                             label = match.group("label")
865                             if not label.startswith('GUID-'):
866                                 ref_guid = label_guids.get(label)
867                                 if ref_guid is not None:
868                                     value = value.replace(
869                                         match.group(),
870                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
871                                             guid = 'GUID-%d' % (ref_guid,),
872                                             expr = match.group("expr"),
873                                             label = label)
874                                     )
875                                     data.set_attribute_data(guid, name, value)
876                                     
877                                     # memorize which guid-attribute pairs require
878                                     # postprocessing, to avoid excessive controller-testbed
879                                     # communication at configuration time
880                                     # (which could require high-latency network I/O)
881                                     if not data.is_testbed_data(guid):
882                                         (testbed_guid, factory_id) = data.get_box_data(guid)
883                                         netrefs[(testbed_guid, guid)].add(name)
884                                     else:
885                                         testbed_netrefs[guid].add(name)
886                                     
887                                     break
888                         else:
889                             break
890
891     def _create_testbed_controller(self, guid, data, element_guids, recover):
892         (testbed_id, testbed_version) = data.get_testbed_data(guid)
893         deployment_config = self._deployment_config.get(guid)
894         
895         # deferred import because proxy needs
896         # our class definitions to define proxies
897         import nepi.util.proxy as proxy
898         
899         if deployment_config is None:
900             # need to create one
901             deployment_config = proxy.AccessConfiguration()
902             
903             for (name, value) in data.get_attribute_data(guid):
904                 if value is not None and deployment_config.has_attribute(name):
905                     # if any deployment config attribute has a netref, we can't
906                     # create this controller yet
907                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
908                         # remember to re-issue this one
909                         self._netreffed_testbeds.add(guid)
910                         return
911                     
912                     # copy deployment config attribute
913                     deployment_config.set_attribute_value(name, value)
914             
915             # commit config
916             self._deployment_config[guid] = deployment_config
917         
918         if deployment_config is not None:
919             # force recovery mode 
920             deployment_config.set_attribute_value("recover",recover)
921         
922         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
923                 deployment_config)
924         for (name, value) in data.get_attribute_data(guid):
925             testbed.defer_configure(name, value)
926         self._testbeds[guid] = testbed
927         if guid in self._netreffed_testbeds:
928             self._netreffed_testbeds.remove(guid)
929
930     def _program_testbed_controllers(self, element_guids, data):
931         def resolve_create_netref(data, guid, name, value): 
932             # Try to resolve create-time netrefs, if possible
933             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
934                 try:
935                     nuvalue = self.resolve_netref_value(value)
936                 except:
937                     # Any trouble means we're not in shape to resolve the netref yet
938                     nuvalue = None
939                 if nuvalue is not None:
940                     # Only if we succeed we remove the netref deferral entry
941                     value = nuvalue
942                     data.set_attribute_data(guid, name, value)
943                     if (testbed_guid, guid) in self._netrefs:
944                         self._netrefs[(testbed_guid, guid)].discard(name)
945             return value
946
947         for guid in element_guids:
948             (testbed_guid, factory_id) = data.get_box_data(guid)
949             testbed = self._testbeds.get(testbed_guid)
950             if testbed is not None:
951                 # create
952                 testbed.defer_create(guid, factory_id)
953                 # set attributes
954                 for (name, value) in data.get_attribute_data(guid):
955                     value = resolve_create_netref(data, guid, name, value)
956                     testbed.defer_create_set(guid, name, value)
957
958         for guid in element_guids:
959             (testbed_guid, factory_id) = data.get_box_data(guid)
960             testbed = self._testbeds.get(testbed_guid)
961             if testbed is not None:
962                 # traces
963                 for trace_id in data.get_trace_data(guid):
964                     testbed.defer_add_trace(guid, trace_id)
965                 # addresses
966                 for (address, netprefix, broadcast) in data.get_address_data(guid):
967                     if address != None:
968                         testbed.defer_add_address(guid, address, netprefix, 
969                                 broadcast)
970                 # routes
971                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
972                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
973                 # store connections data
974                 for (connector_type_name, other_guid, other_connector_type_name) \
975                         in data.get_connection_data(guid):
976                     (other_testbed_guid, other_factory_id) = data.get_box_data(
977                             other_guid)
978                     if testbed_guid == other_testbed_guid:
979                         # each testbed should take care of enforcing internal
980                         # connection simmetry, so each connection is only
981                         # added in one direction
982                         testbed.defer_connect(guid, connector_type_name, 
983                                 other_guid, other_connector_type_name)
984
985     def _program_testbed_cross_connections(self, data):
986         data_guids = data.guids
987         for guid in data_guids: 
988             if not data.is_testbed_data(guid):
989                 (testbed_guid, factory_id) = data.get_box_data(guid)
990                 testbed = self._testbeds.get(testbed_guid)
991                 if testbed is not None:
992                     for (connector_type_name, cross_guid, cross_connector_type_name) \
993                             in data.get_connection_data(guid):
994                         (testbed_guid, factory_id) = data.get_box_data(guid)
995                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
996                                 cross_guid)
997                         if testbed_guid != cross_testbed_guid:
998                             cross_testbed = self._testbeds[cross_testbed_guid]
999                             cross_testbed_id = cross_testbed.testbed_id
1000                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
1001                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
1002                                     cross_connector_type_name)
1003                             # save cross data for later
1004                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1005                                     cross_guid)
1006
1007     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1008         if testbed_guid not in self._cross_data:
1009             self._cross_data[testbed_guid] = dict()
1010         if cross_testbed_guid not in self._cross_data[testbed_guid]:
1011             self._cross_data[testbed_guid][cross_testbed_guid] = set()
1012         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1013
1014     def _get_cross_data(self, testbed_guid):
1015         cross_data = dict()
1016         if not testbed_guid in self._cross_data:
1017             return cross_data
1018
1019         # fetch attribute lists in one batch
1020         attribute_lists = dict()
1021         for cross_testbed_guid, guid_list in \
1022                 self._cross_data[testbed_guid].iteritems():
1023             cross_testbed = self._testbeds[cross_testbed_guid]
1024             for cross_guid in guid_list:
1025                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1026                     cross_testbed.get_attribute_list_deferred(cross_guid)
1027
1028         # fetch attribute values in another batch
1029         for cross_testbed_guid, guid_list in \
1030                 self._cross_data[testbed_guid].iteritems():
1031             cross_data[cross_testbed_guid] = dict()
1032             cross_testbed = self._testbeds[cross_testbed_guid]
1033             for cross_guid in guid_list:
1034                 elem_cross_data = dict(
1035                     _guid = cross_guid,
1036                     _testbed_guid = cross_testbed_guid,
1037                     _testbed_id = cross_testbed.testbed_id,
1038                     _testbed_version = cross_testbed.testbed_version)
1039                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1040                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1041                 for attr_name in attribute_list:
1042                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1043                     elem_cross_data[attr_name] = attr_value
1044         
1045         # undefer all values - we'll have to serialize them probably later
1046         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1047             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1048                 for attr_name, attr_value in elem_cross_data.iteritems():
1049                     elem_cross_data[attr_name] = _undefer(attr_value)
1050         
1051         return cross_data
1052     """
1053 class ExperimentSuite(object):
1054     def __init__(self, experiment_xml, access_config, repetitions,
1055             duration, wait_guids):
1056         self._experiment_xml = experiment_xml
1057         self._access_config = access_config
1058         self._experiments = dict()
1059         self._repetitions = repetitions
1060         self._duration = duration
1061         self._wait_guids = wait_guids
1062         self._current = None
1063         self._status = TS.STATUS_ZERO
1064         self._thread = None
1065
1066     def start(self):
1067         self._status  = TS.STATUS_STARTED
1068         self._thread = threading.Thread(target = self._run_experiment_suite)
1069         self._thread.start()
1070
1071     def shutdown(self):
1072         if self._thread:
1073             self._thread.join()
1074             self._thread = None
1075
1076     def _run_experiment_suite(self):
1077         for i in xrange[0, self.repetitions]:
1078             self._current = i
1079             self._run_one_experiment()
1080
1081     def _run_one_experiment(self):
1082         access_config = proxy.AccessConfiguration()
1083         for attr in self._access_config.attributes:
1084             access_config.set_attribute_value(attr.name, attr.value)
1085         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1086         root_dir = "%s_%d" % (
1087                 access_config.get_attribute_value(DC.ROOT_DIRECTORY), 
1088                 self._current)
1089         access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1090         controller = proxy.create_experiment_controller(self._experiment_xml,
1091                 access_config)
1092         self._experiments[self._current] = controller
1093         controller.start()
1094         started_at = time.time()
1095         # wait until all specified guids have finished execution
1096         if self._wait_guids:
1097             while all(itertools.imap(controller.is_finished, self._wait_guids):
1098                 time.sleep(0.5)
1099         # wait until the minimum experiment duration time has elapsed 
1100         if self._duration:
1101             while (time.time() - started_at) < self._duration:
1102                 time.sleep(0.5)
1103         controller.stop()
1104         #download results!!
1105         controller.shutdown()
1106     """
1107
1108
1109
1110
1111
1112
1113