Fix shutdown order to respect creation order (important when running nepi-in-nepi)
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15 import time
16 import logging
17 logging.basicConfig()
18
19 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
20 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
21 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
22
23 def _undefer(deferred):
24     if hasattr(deferred, '_get'):
25         return deferred._get()
26     else:
27         return deferred
28
29
30 class TestbedController(object):
31     def __init__(self, testbed_id, testbed_version):
32         self._testbed_id = testbed_id
33         self._testbed_version = testbed_version
34
35     @property
36     def testbed_id(self):
37         return self._testbed_id
38
39     @property
40     def testbed_version(self):
41         return self._testbed_version
42
43     @property
44     def guids(self):
45         raise NotImplementedError
46
47     def defer_configure(self, name, value):
48         """Instructs setting a configuartion attribute for the testbed instance"""
49         raise NotImplementedError
50
51     def defer_create(self, guid, factory_id):
52         """Instructs creation of element """
53         raise NotImplementedError
54
55     def defer_create_set(self, guid, name, value):
56         """Instructs setting an initial attribute on an element"""
57         raise NotImplementedError
58
59     def defer_factory_set(self, guid, name, value):
60         """Instructs setting an attribute on a factory"""
61         raise NotImplementedError
62
63     def defer_connect(self, guid1, connector_type_name1, guid2, 
64             connector_type_name2): 
65         """Instructs creation of a connection between the given connectors"""
66         raise NotImplementedError
67
68     def defer_cross_connect(self, 
69             guid, connector_type_name,
70             cross_guid, cross_testbed_guid,
71             cross_testbed_id, cross_factory_id,
72             cross_connector_type_name):
73         """
74         Instructs creation of a connection between the given connectors 
75         of different testbed instances
76         """
77         raise NotImplementedError
78
79     def defer_add_trace(self, guid, trace_id):
80         """Instructs the addition of a trace"""
81         raise NotImplementedError
82
83     def defer_add_address(self, guid, address, netprefix, broadcast): 
84         """Instructs the addition of an address"""
85         raise NotImplementedError
86
87     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
88         """Instructs the addition of a route"""
89         raise NotImplementedError
90
91     def do_setup(self):
92         """After do_setup the testbed initial configuration is done"""
93         raise NotImplementedError
94
95     def do_create(self):
96         """
97         After do_create all instructed elements are created and 
98         attributes setted
99         """
100         raise NotImplementedError
101
102     def do_connect_init(self):
103         """
104         After do_connect_init all internal connections between testbed elements
105         are initiated
106         """
107         raise NotImplementedError
108
109     def do_connect_compl(self):
110         """
111         After do_connect all internal connections between testbed elements
112         are completed
113         """
114         raise NotImplementedError
115
116     def do_preconfigure(self):
117         """
118         Done just before resolving netrefs, after connection, before cross connections,
119         useful for early stages of configuration, for setting up stuff that might be
120         required for netref resolution.
121         """
122         raise NotImplementedError
123
124     def do_configure(self):
125         """After do_configure elements are configured"""
126         raise NotImplementedError
127
128     def do_prestart(self):
129         """Before do_start elements are prestart-configured"""
130         raise NotImplementedError
131
132     def do_cross_connect_init(self, cross_data):
133         """
134         After do_cross_connect_init initiation of all external connections 
135         between different testbed elements is performed
136         """
137         raise NotImplementedError
138
139     def do_cross_connect_compl(self, cross_data):
140         """
141         After do_cross_connect_compl completion of all external connections 
142         between different testbed elements is performed
143         """
144         raise NotImplementedError
145
146     def start(self):
147         raise NotImplementedError
148
149     def stop(self):
150         raise NotImplementedError
151
152     def recover(self):
153         """
154         On testbed recovery (if recovery is a supported policy), the controller
155         instance will be re-created and the following sequence invoked:
156         
157             do_setup
158             defer_X - programming the testbed with persisted execution values
159                 (not design values). Execution values (ExecImmutable attributes)
160                 should be enough to recreate the testbed's state.
161             *recover*
162             <cross-connection methods>
163             
164         Start will not be called, and after cross connection invocations,
165         the testbed is supposed to be fully functional again.
166         """
167         raise NotImplementedError
168
169     def set(self, guid, name, value, time = TIME_NOW):
170         raise NotImplementedError
171
172     def get(self, guid, name, time = TIME_NOW):
173         raise NotImplementedError
174     
175     def get_route(self, guid, index, attribute):
176         """
177         Params:
178             
179             guid: guid of box to query
180             index: number of routing entry to fetch
181             attribute: one of Destination, NextHop, NetPrefix
182         """
183         raise NotImplementedError
184
185     def get_address(self, guid, index, attribute='Address'):
186         """
187         Params:
188             
189             guid: guid of box to query
190             index: number of inteface to select
191             attribute: one of Address, NetPrefix, Broadcast
192         """
193         raise NotImplementedError
194
195     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
196         raise NotImplementedError
197
198     def get_factory_id(self, guid):
199         raise NotImplementedError
200
201     def action(self, time, guid, action):
202         raise NotImplementedError
203
204     def status(self, guid):
205         raise NotImplementedError
206     
207     def testbed_status(self):
208         raise NotImplementedError
209
210     def trace(self, guid, trace_id, attribute='value'):
211         raise NotImplementedError
212
213     def traces_info(self):
214         """ dictionary of dictionaries:
215             traces_info = dict({
216                 guid = dict({
217                     trace_id = dict({
218                             host = host,
219                             filepath = filepath,
220                             filesize = size in bytes,
221                         })
222                 })
223             })"""
224         raise NotImplementedError
225
226     def shutdown(self):
227         raise NotImplementedError
228
229 class ExperimentController(object):
230     def __init__(self, experiment_xml, root_dir):
231         self._experiment_design_xml = experiment_xml
232         self._experiment_execute_xml = None
233         self._testbeds = dict()
234         self._deployment_config = dict()
235         self._netrefs = collections.defaultdict(set)
236         self._testbed_netrefs = collections.defaultdict(set)
237         self._cross_data = dict()
238         self._root_dir = root_dir
239         self._netreffed_testbeds = set()
240         self._guids_in_testbed_cache = dict()
241         self._failed_testbeds = set()
242         self._started_time = None
243         self._stopped_time = None
244         self._testbed_order = []
245       
246         self._logger = logging.getLogger('nepi.core.execute')
247         level = logging.ERROR
248         if os.environ.get("NEPI_CONTROLLER_LOGLEVEL", 
249                 DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
250             level = logging.DEBUG
251         self._logger.setLevel(level)
252  
253         if experiment_xml is None and root_dir is not None:
254             # Recover
255             self.load_experiment_xml()
256             self.load_execute_xml()
257         else:
258             self.persist_experiment_xml()
259
260     @property
261     def experiment_design_xml(self):
262         return self._experiment_design_xml
263
264     @property
265     def experiment_execute_xml(self):
266         return self._experiment_execute_xml
267
268     @property
269     def started_time(self):
270         return self._started_time
271
272     @property
273     def stopped_time(self):
274         return self._stopped_time
275
276     @property
277     def guids(self):
278         guids = list()
279         for testbed_guid in self._testbeds.keys():
280             _guids = self._guids_in_testbed(testbed_guid)
281             if _guids:
282                 guids.extend(_guids)
283         return guids
284
285     def persist_experiment_xml(self):
286         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
287         f = open(xml_path, "w")
288         f.write(self._experiment_design_xml)
289         f.close()
290
291     def persist_execute_xml(self):
292         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
293         f = open(xml_path, "w")
294         f.write(self._experiment_execute_xml)
295         f.close()
296
297     def load_experiment_xml(self):
298         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
299         f = open(xml_path, "r")
300         self._experiment_design_xml = f.read()
301         f.close()
302
303     def load_execute_xml(self):
304         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
305         f = open(xml_path, "r")
306         self._experiment_execute_xml = f.read()
307         f.close()
308
309     def trace(self, guid, trace_id, attribute='value'):
310         testbed = self._testbed_for_guid(guid)
311         if testbed != None:
312             return testbed.trace(guid, trace_id, attribute)
313         raise RuntimeError("No element exists with guid %d" % guid)    
314
315     def traces_info(self):
316         traces_info = dict()
317         for guid, testbed in self._testbeds.iteritems():
318             tinfo = testbed.traces_info()
319             if tinfo:
320                 traces_info[guid] = testbed.traces_info()
321         return traces_info
322
323     @staticmethod
324     def _parallel(callables):
325         excs = []
326         def wrap(callable):
327             def wrapped(*p, **kw):
328                 try:
329                     callable(*p, **kw)
330                 except:
331                     logging.exception("Exception occurred in asynchronous thread:")
332                     excs.append(sys.exc_info())
333             try:
334                 wrapped = functools.wraps(callable)(wrapped)
335             except:
336                 # functools.partial not wrappable
337                 pass
338             return wrapped
339         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
340         for thread in threads:
341             thread.start()
342         for thread in threads:
343             thread.join()
344         for exc in excs:
345             eTyp, eVal, eLoc = exc
346             raise eTyp, eVal, eLoc
347
348     def start(self):
349         self._started_time = time.time() 
350         self._start()
351
352     def _start(self, recover = False):
353         parser = XmlExperimentParser()
354         
355         if recover:
356             xml = self._experiment_execute_xml
357         else:
358             xml = self._experiment_design_xml
359         data = parser.from_xml_to_data(xml)
360
361         # instantiate testbed controllers
362         to_recover, to_restart = self._init_testbed_controllers(data, recover)
363         all_restart = set(to_restart)
364         
365         if not recover:
366             # persist testbed connection data, for potential recovery
367             self._persist_testbed_proxies()
368         else:
369             # recover recoverable controllers
370             for guid in to_recover:
371                 try:
372                     self._testbeds[guid].do_setup()
373                     self._testbeds[guid].recover()
374                 except:
375                     self._logger.exception("During recovery of testbed %s", guid)
376                     
377                     # Mark failed
378                     self._failed_testbeds.add(guid)
379     
380         def steps_to_configure(self, allowed_guids):
381             # perform setup in parallel for all test beds,
382             # wait for all threads to finish
383
384             self._logger.debug("ExperimentController: Starting parallel do_setup")
385             self._parallel([testbed.do_setup 
386                             for guid,testbed in self._testbeds.iteritems()
387                             if guid in allowed_guids])
388        
389             # perform create-connect in parallel, wait
390             # (internal connections only)
391             self._logger.debug("ExperimentController: Starting parallel do_create")
392             self._parallel([testbed.do_create
393                             for guid,testbed in self._testbeds.iteritems()
394                             if guid in allowed_guids])
395
396             self._logger.debug("ExperimentController: Starting parallel do_connect_init")
397             self._parallel([testbed.do_connect_init
398                             for guid,testbed in self._testbeds.iteritems()
399                             if guid in allowed_guids])
400
401             self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
402             self._parallel([testbed.do_connect_compl
403                             for guid,testbed in self._testbeds.iteritems()
404                             if guid in allowed_guids])
405
406             self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
407             self._parallel([testbed.do_preconfigure
408                             for guid,testbed in self._testbeds.iteritems()
409                             if guid in allowed_guids])
410             self._clear_caches()
411             
412             # Store testbed order
413             self._testbed_order.append(allowed_guids)
414
415         steps_to_configure(self, to_restart)
416
417         if self._netreffed_testbeds:
418             self._logger.debug("ExperimentController: Resolving netreffed testbeds")
419             # initally resolve netrefs
420             self.do_netrefs(data, fail_if_undefined=False)
421             
422             # rinse and repeat, for netreffed testbeds
423             netreffed_testbeds = set(self._netreffed_testbeds)
424
425             to_recover, to_restart = self._init_testbed_controllers(data, recover)
426             all_restart.update(to_restart)
427             
428             if not recover:
429                 # persist testbed connection data, for potential recovery
430                 self._persist_testbed_proxies()
431             else:
432                 # recover recoverable controllers
433                 for guid in to_recover:
434                     try:
435                         self._testbeds[guid].do_setup()
436                         self._testbeds[guid].recover()
437                     except:
438                         self._logger.exception("During recovery of testbed %s", guid)
439
440                         # Mark failed
441                         self._failed_testbeds.add(guid)
442
443             # configure dependant testbeds
444             steps_to_configure(self, to_restart)
445         
446         all_restart = [ self._testbeds[guid] for guid in all_restart ]
447             
448         # final netref step, fail if anything's left unresolved
449         self._logger.debug("ExperimentController: Resolving do_netrefs")
450         self.do_netrefs(data, fail_if_undefined=False)
451        
452         # Only now, that netref dependencies have been solve, it is safe to
453         # program cross_connections
454         self._logger.debug("ExperimentController: Programming testbed cross-connections")
455         self._program_testbed_cross_connections(data)
456  
457         # perform do_configure in parallel for al testbeds
458         # (it's internal configuration for each)
459         self._logger.debug("ExperimentController: Starting parallel do_configure")
460         self._parallel([testbed.do_configure
461                         for testbed in all_restart])
462
463         self._clear_caches()
464
465         #print >>sys.stderr, "DO IT"
466         #import time
467         #time.sleep(60)
468         
469         # cross-connect (cannot be done in parallel)
470         self._logger.debug("ExperimentController: Starting cross-connect")
471         for guid, testbed in self._testbeds.iteritems():
472             cross_data = self._get_cross_data(guid)
473             testbed.do_cross_connect_init(cross_data)
474         for guid, testbed in self._testbeds.iteritems():
475             cross_data = self._get_cross_data(guid)
476             testbed.do_cross_connect_compl(cross_data)
477        
478         self._clear_caches()
479
480         # Last chance to configure (parallel on all testbeds)
481         self._logger.debug("ExperimentController: Starting parallel do_prestart")
482         self._parallel([testbed.do_prestart
483                         for testbed in all_restart])
484
485         # final netref step, fail if anything's left unresolved
486         self.do_netrefs(data, fail_if_undefined=True)
487  
488         self._clear_caches()
489         
490         if not recover:
491             # update execution xml with execution-specific values
492             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
493             self._update_execute_xml()
494             self.persist_execute_xml()
495
496         # start experiment (parallel start on all testbeds)
497         self._logger.debug("ExperimentController: Starting parallel do_start")
498         self._parallel([testbed.start
499                         for testbed in all_restart])
500
501         self._clear_caches()
502
503     def _clear_caches(self):
504         # Cleaning cache for safety.
505         self._guids_in_testbed_cache = dict()
506
507     def _persist_testbed_proxies(self):
508         TRANSIENT = (DC.RECOVER,)
509         
510         # persist access configuration for all testbeds, so that
511         # recovery mode can reconnect to them if it becomes necessary
512         conf = ConfigParser.RawConfigParser()
513         for testbed_guid, testbed_config in self._deployment_config.iteritems():
514             testbed_guid = str(testbed_guid)
515             conf.add_section(testbed_guid)
516             for attr in testbed_config.get_attribute_list():
517                 if attr not in TRANSIENT:
518                     value = testbed_config.get_attribute_value(attr)
519                     if value is not None:
520                         conf.set(testbed_guid, attr, value)
521         
522         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
523         conf.write(f)
524         f.close()
525     
526     def _load_testbed_proxies(self):
527         TYPEMAP = {
528             Attribute.STRING : 'get',
529             Attribute.BOOL : 'getboolean',
530             Attribute.ENUM : 'get',
531             Attribute.DOUBLE : 'getfloat',
532             Attribute.INTEGER : 'getint',
533         }
534         
535         TRANSIENT = (DC.RECOVER,)
536         
537         # deferred import because proxy needs
538         # our class definitions to define proxies
539         import nepi.util.proxy as proxy
540         
541         conf = ConfigParser.RawConfigParser()
542         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
543         for testbed_guid in conf.sections():
544             testbed_config = proxy.AccessConfiguration()
545             testbed_guid = str(testbed_guid)
546             for attr in testbed_config.get_attribute_list():
547                 if attr not in TRANSIENT:
548                     getter = getattr(conf, TYPEMAP.get(
549                         testbed_config.get_attribute_type(attr),
550                         'get') )
551                     try:
552                         value = getter(testbed_guid, attr)
553                         testbed_config.set_attribute_value(attr, value)
554                     except ConfigParser.NoOptionError:
555                         # Leave default
556                         pass
557     
558     def _unpersist_testbed_proxies(self):
559         try:
560             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
561         except:
562             # Just print exceptions, this is just cleanup
563             self._logger.exception("Loading testbed configuration")
564
565     def _update_execute_xml(self):
566         # For all testbeds,
567         #   For all elements in testbed,
568         #       - gather immutable execute-readable attribuets lists
569         #         asynchronously
570         # Generate new design description from design xml
571         # (Wait for attributes lists - implicit syncpoint)
572         # For all testbeds,
573         #   For all elements in testbed,
574         #       - gather all immutable execute-readable attribute
575         #         values, asynchronously
576         # (Wait for attribute values - implicit syncpoint)
577         # For all testbeds,
578         #   For all elements in testbed,
579         #       - inject non-None values into new design
580         # Generate execute xml from new design
581
582         attribute_lists = dict(
583             (testbed_guid, collections.defaultdict(dict))
584             for testbed_guid in self._testbeds
585         )
586         
587         for testbed_guid, testbed in self._testbeds.iteritems():
588             guids = self._guids_in_testbed(testbed_guid)
589             for guid in guids:
590                 attribute_lists[testbed_guid][guid] = \
591                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
592         
593         parser = XmlExperimentParser()
594         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
595
596         attribute_values = dict(
597             (testbed_guid, collections.defaultdict(dict))
598             for testbed_guid in self._testbeds
599         )
600         
601         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
602             testbed = self._testbeds[testbed_guid]
603             for guid, attribute_list in testbed_attribute_lists.iteritems():
604                 attribute_list = _undefer(attribute_list)
605                 attribute_values[testbed_guid][guid] = dict(
606                     (attribute, testbed.get_deferred(guid, attribute))
607                     for attribute in attribute_list
608                 )
609         
610         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
611             for guid, attribute_values in testbed_attribute_values.iteritems():
612                 for attribute, value in attribute_values.iteritems():
613                     value = _undefer(value)
614                     if value is not None:
615                         execute_data.add_attribute_data(guid, attribute, value)
616         
617         self._experiment_execute_xml = parser.to_xml(data=execute_data)
618
619     def stop(self):
620        for testbed in self._testbeds.values():
621            testbed.stop()
622        self._unpersist_testbed_proxies()
623        self._stopped_time = time.time() 
624    
625     def recover(self):
626         # reload perviously persisted testbed access configurations
627         self._failed_testbeds.clear()
628         self._load_testbed_proxies()
629
630         # re-program testbeds that need recovery
631         self._start(recover = True)
632
633     def is_finished(self, guid):
634         testbed = self._testbed_for_guid(guid)
635         if testbed != None:
636             return testbed.status(guid) == AS.STATUS_FINISHED
637         raise RuntimeError("No element exists with guid %d" % guid)    
638     
639     def _testbed_recovery_policy(self, guid, data = None):
640         if data is None:
641             parser = XmlExperimentParser()
642             data = parser.from_xml_to_data(self._experiment_design_xml)
643         
644         return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
645
646     def status(self, guid):
647         if guid in self._testbeds:
648             # guid is a testbed
649             # report testbed status
650             if guid in self._failed_testbeds:
651                 return TS.STATUS_FAILED
652             else:
653                 try:
654                     return self._testbeds[guid].status()
655                 except:
656                     return TS.STATUS_UNRESPONSIVE
657         else:
658             # guid is an element
659             testbed = self._testbed_for_guid(guid)
660             if testbed is not None:
661                 return testbed.status(guid)
662             else:
663                 return AS.STATUS_UNDETERMINED
664
665     def set(self, guid, name, value, time = TIME_NOW):
666         testbed = self._testbed_for_guid(guid)
667         if testbed != None:
668             testbed.set(guid, name, value, time)
669         else:
670             raise RuntimeError("No element exists with guid %d" % guid)    
671
672     def get(self, guid, name, time = TIME_NOW):
673         testbed = self._testbed_for_guid(guid)
674         if testbed != None:
675             return testbed.get(guid, name, time)
676         raise RuntimeError("No element exists with guid %d" % guid)    
677
678     def get_deferred(self, guid, name, time = TIME_NOW):
679         testbed = self._testbed_for_guid(guid)
680         if testbed != None:
681             return testbed.get_deferred(guid, name, time)
682         raise RuntimeError("No element exists with guid %d" % guid)    
683
684     def get_factory_id(self, guid):
685         testbed = self._testbed_for_guid(guid)
686         if testbed != None:
687             return testbed.get_factory_id(guid)
688         raise RuntimeError("No element exists with guid %d" % guid)    
689
690     def get_testbed_id(self, guid):
691         testbed = self._testbed_for_guid(guid)
692         if testbed != None:
693             return testbed.testbed_id
694         raise RuntimeError("No element exists with guid %d" % guid)    
695
696     def get_testbed_version(self, guid):
697         testbed = self._testbed_for_guid(guid)
698         if testbed != None:
699             return testbed.testbed_version
700         raise RuntimeError("No element exists with guid %d" % guid)    
701
702     def shutdown(self):
703         exceptions = list()
704         ordered_testbeds = set()
705
706         def shutdown_testbed(guid):
707             try:
708                 testbed = self._testbeds[guid]
709                 ordered_testbeds.add(guid)
710                 testbed.shutdown()
711             except:
712                 exceptions.append(sys.exc_info())
713                 
714         self._logger.debug("ExperimentController: Starting parallel shutdown")
715         
716         for testbed_guids in reversed(self._testbed_order):
717             self._parallel([functools.partial(shutdown_testbed, guid)
718                             for guid in testbed_guids])
719         remaining_guids = set(self._testbeds) - ordered_testbeds
720         if remaining_guids:
721             self._parallel([functools.partial(shutdown_testbed, guid)
722                             for guid in remaining_guids])
723             
724         for exc_info in exceptions:
725             raise exc_info[0], exc_info[1], exc_info[2]
726
727     def _testbed_for_guid(self, guid):
728         for testbed_guid in self._testbeds.keys():
729             if guid in self._guids_in_testbed(testbed_guid):
730                 if testbed_guid in self._failed_testbeds:
731                     return None
732                 return self._testbeds[testbed_guid]
733         return None
734
735     def _guids_in_testbed(self, testbed_guid):
736         if testbed_guid not in self._testbeds:
737             return set()
738         if testbed_guid not in self._guids_in_testbed_cache:
739             self._guids_in_testbed_cache[testbed_guid] = \
740                 set(self._testbeds[testbed_guid].guids)
741         return self._guids_in_testbed_cache[testbed_guid]
742
743     @staticmethod
744     def _netref_component_split(component):
745         match = COMPONENT_PATTERN.match(component)
746         if match:
747             return match.group("kind"), match.group("index")
748         else:
749             return component, None
750
751     _NETREF_COMPONENT_GETTERS = {
752         'addr':
753             lambda testbed, guid, index, name: 
754                 testbed.get_address(guid, int(index), name),
755         'route' :
756             lambda testbed, guid, index, name: 
757                 testbed.get_route(guid, int(index), name),
758         'trace' :
759             lambda testbed, guid, index, name: 
760                 testbed.trace(guid, index, attribute = name),
761         '' : 
762             lambda testbed, guid, index, name: 
763                 testbed.get(guid, name),
764     }
765     
766     def resolve_netref_value(self, value, failval = None):
767         rv = failval
768         while True:
769             for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
770                 label = match.group("label")
771                 if label.startswith('GUID-'):
772                     ref_guid = int(label[5:])
773                     if ref_guid:
774                         expr = match.group("expr")
775                         component = (match.group("component") or "")[1:] # skip the dot
776                         attribute = match.group("attribute")
777                         
778                         # split compound components into component kind and index
779                         # eg: 'addr[0]' -> ('addr', '0')
780                         component, component_index = self._netref_component_split(component)
781
782                         # find object and resolve expression
783                         for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
784                             if component not in self._NETREF_COMPONENT_GETTERS:
785                                 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
786                             elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
787                                 pass
788                             else:
789                                 ref_value = self._NETREF_COMPONENT_GETTERS[component](
790                                     ref_testbed, ref_guid, component_index, attribute)
791                                 if ref_value:
792                                     value = rv = value.replace(match.group(), ref_value)
793                                     break
794                         else:
795                             # unresolvable netref
796                             return failval
797                         break
798             else:
799                 break
800         return rv
801     
802     def do_netrefs(self, data, fail_if_undefined = False):
803         # element netrefs
804         for (testbed_guid, guid), attrs in self._netrefs.items():
805             testbed = self._testbeds.get(testbed_guid)
806             if testbed is not None:
807                 for name in set(attrs):
808                     value = testbed.get(guid, name)
809                     if isinstance(value, basestring):
810                         ref_value = self.resolve_netref_value(value)
811                         if ref_value is not None:
812                             testbed.set(guid, name, ref_value)
813                             attrs.remove(name)
814                         elif fail_if_undefined:
815                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
816                 if not attrs:
817                     del self._netrefs[(testbed_guid, guid)]
818         
819         # testbed netrefs
820         for testbed_guid, attrs in self._testbed_netrefs.items():
821             tb_data = dict(data.get_attribute_data(testbed_guid))
822             if data:
823                 for name in set(attrs):
824                     value = tb_data.get(name)
825                     if isinstance(value, basestring):
826                         ref_value = self.resolve_netref_value(value)
827                         if ref_value is not None:
828                             data.set_attribute_data(testbed_guid, name, ref_value)
829                             attrs.remove(name)
830                         elif fail_if_undefined:
831                             raise ValueError, "Unresolvable netref in: %r" % (value,)
832                 if not attrs:
833                     del self._testbed_netrefs[testbed_guid]
834         
835
836     def _init_testbed_controllers(self, data, recover = False):
837         blacklist_testbeds = set(self._testbeds)
838         element_guids = list()
839         label_guids = dict()
840         data_guids = data.guids
841         to_recover = set()
842         to_restart = set()
843
844         # gather label associations
845         for guid in data_guids:
846             if not data.is_testbed_data(guid):
847                 (testbed_guid, factory_id) = data.get_box_data(guid)
848                 label = data.get_attribute_data(guid, "label")
849                 if label is not None:
850                     if label in label_guids:
851                         raise RuntimeError, "Label %r is not unique" % (label,)
852                     label_guids[label] = guid
853
854         # create testbed controllers
855         for guid in data_guids:
856             if data.is_testbed_data(guid):
857                 if guid not in self._testbeds:
858                     try:
859                         self._create_testbed_controller(
860                             guid, data, element_guids, recover)
861                         if recover:
862                             # Already programmed
863                             blacklist_testbeds.add(guid)
864                         else:
865                             to_restart.add(guid)
866                     except:
867                         if recover:
868                             policy = self._testbed_recovery_policy(guid, data=data)
869                             if policy == DC.POLICY_RECOVER:
870                                 self._create_testbed_controller(
871                                     guid, data, element_guids, False)
872                                 to_recover.add(guid)
873                             elif policy == DC.POLICY_RESTART:
874                                 self._create_testbed_controller(
875                                     guid, data, element_guids, False)
876                                 to_restart.add(guid)
877                             else:
878                                 # Mark failed
879                                 self._failed_testbeds.add(guid)
880                         else:
881                             raise
882         
883         # queue programmable elements
884         #  - that have not been programmed already (blacklist_testbeds)
885         #  - including recovered or restarted testbeds
886         #  - but those that have no unresolved netrefs
887         for guid in data_guids:
888             if not data.is_testbed_data(guid):
889                 (testbed_guid, factory_id) = data.get_box_data(guid)
890                 if testbed_guid not in blacklist_testbeds:
891                     element_guids.append(guid)
892
893         # replace references to elements labels for its guid
894         self._resolve_labels(data, data_guids, label_guids)
895     
896         # program testbed controllers
897         if element_guids:
898             self._program_testbed_controllers(element_guids, data)
899         
900         return to_recover, to_restart
901
902     def _resolve_labels(self, data, data_guids, label_guids):
903         netrefs = self._netrefs
904         testbed_netrefs = self._testbed_netrefs
905         for guid in data_guids:
906             for name, value in data.get_attribute_data(guid):
907                 if isinstance(value, basestring):
908                     while True:
909                         for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
910                             label = match.group("label")
911                             if not label.startswith('GUID-'):
912                                 ref_guid = label_guids.get(label)
913                                 if ref_guid is not None:
914                                     value = value.replace(
915                                         match.group(),
916                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
917                                             guid = 'GUID-%d' % (ref_guid,),
918                                             expr = match.group("expr"),
919                                             label = label)
920                                     )
921                                     data.set_attribute_data(guid, name, value)
922                                     
923                                     # memorize which guid-attribute pairs require
924                                     # postprocessing, to avoid excessive controller-testbed
925                                     # communication at configuration time
926                                     # (which could require high-latency network I/O)
927                                     if not data.is_testbed_data(guid):
928                                         (testbed_guid, factory_id) = data.get_box_data(guid)
929                                         netrefs[(testbed_guid, guid)].add(name)
930                                     else:
931                                         testbed_netrefs[guid].add(name)
932                                     
933                                     break
934                         else:
935                             break
936
937     def _create_testbed_controller(self, guid, data, element_guids, recover):
938         (testbed_id, testbed_version) = data.get_testbed_data(guid)
939         deployment_config = self._deployment_config.get(guid)
940         
941         # deferred import because proxy needs
942         # our class definitions to define proxies
943         import nepi.util.proxy as proxy
944         
945         if deployment_config is None:
946             # need to create one
947             deployment_config = proxy.AccessConfiguration()
948             
949             for (name, value) in data.get_attribute_data(guid):
950                 if value is not None and deployment_config.has_attribute(name):
951                     # if any deployment config attribute has a netref, we can't
952                     # create this controller yet
953                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
954                         # remember to re-issue this one
955                         self._netreffed_testbeds.add(guid)
956                         return
957                     
958                     # copy deployment config attribute
959                     deployment_config.set_attribute_value(name, value)
960             
961             # commit config
962             self._deployment_config[guid] = deployment_config
963         
964         if deployment_config is not None:
965             # force recovery mode 
966             deployment_config.set_attribute_value("recover",recover)
967         
968         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
969                 deployment_config)
970         for (name, value) in data.get_attribute_data(guid):
971             testbed.defer_configure(name, value)
972         self._testbeds[guid] = testbed
973         if guid in self._netreffed_testbeds:
974             self._netreffed_testbeds.remove(guid)
975
976     def _program_testbed_controllers(self, element_guids, data):
977         def resolve_create_netref(data, guid, name, value): 
978             # Try to resolve create-time netrefs, if possible
979             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
980                 try:
981                     nuvalue = self.resolve_netref_value(value)
982                 except:
983                     # Any trouble means we're not in shape to resolve the netref yet
984                     nuvalue = None
985                 if nuvalue is not None:
986                     # Only if we succeed we remove the netref deferral entry
987                     value = nuvalue
988                     data.set_attribute_data(guid, name, value)
989                     if (testbed_guid, guid) in self._netrefs:
990                         self._netrefs[(testbed_guid, guid)].discard(name)
991             return value
992
993         for guid in element_guids:
994             (testbed_guid, factory_id) = data.get_box_data(guid)
995             testbed = self._testbeds.get(testbed_guid)
996             if testbed is not None:
997                 # create
998                 testbed.defer_create(guid, factory_id)
999                 # set attributes
1000                 for (name, value) in data.get_attribute_data(guid):
1001                     value = resolve_create_netref(data, guid, name, value)
1002                     testbed.defer_create_set(guid, name, value)
1003
1004         for guid in element_guids:
1005             (testbed_guid, factory_id) = data.get_box_data(guid)
1006             testbed = self._testbeds.get(testbed_guid)
1007             if testbed is not None:
1008                 # traces
1009                 for trace_id in data.get_trace_data(guid):
1010                     testbed.defer_add_trace(guid, trace_id)
1011                 # addresses
1012                 for (address, netprefix, broadcast) in data.get_address_data(guid):
1013                     if address != None:
1014                         testbed.defer_add_address(guid, address, netprefix, 
1015                                 broadcast)
1016                 # routes
1017                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
1018                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
1019                 # store connections data
1020                 for (connector_type_name, other_guid, other_connector_type_name) \
1021                         in data.get_connection_data(guid):
1022                     (other_testbed_guid, other_factory_id) = data.get_box_data(
1023                             other_guid)
1024                     if testbed_guid == other_testbed_guid:
1025                         # each testbed should take care of enforcing internal
1026                         # connection simmetry, so each connection is only
1027                         # added in one direction
1028                         testbed.defer_connect(guid, connector_type_name, 
1029                                 other_guid, other_connector_type_name)
1030
1031     def _program_testbed_cross_connections(self, data):
1032         data_guids = data.guids
1033         for guid in data_guids: 
1034             if not data.is_testbed_data(guid):
1035                 (testbed_guid, factory_id) = data.get_box_data(guid)
1036                 testbed = self._testbeds.get(testbed_guid)
1037                 if testbed is not None:
1038                     for (connector_type_name, cross_guid, cross_connector_type_name) \
1039                             in data.get_connection_data(guid):
1040                         (testbed_guid, factory_id) = data.get_box_data(guid)
1041                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
1042                                 cross_guid)
1043                         if testbed_guid != cross_testbed_guid:
1044                             cross_testbed = self._testbeds[cross_testbed_guid]
1045                             cross_testbed_id = cross_testbed.testbed_id
1046                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
1047                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
1048                                     cross_connector_type_name)
1049                             # save cross data for later
1050                             self._logger.debug("ExperimentController: adding cross_connection data tbd=%d:guid=%d - tbd=%d:guid=%d" % \
1051                                     (testbed_guid, guid, cross_testbed_guid, cross_guid))
1052                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1053                                     cross_guid)
1054
1055     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1056         if testbed_guid not in self._cross_data:
1057             self._cross_data[testbed_guid] = dict()
1058         if cross_testbed_guid not in self._cross_data[testbed_guid]:
1059             self._cross_data[testbed_guid][cross_testbed_guid] = set()
1060         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1061
1062     def _get_cross_data(self, testbed_guid):
1063         cross_data = dict()
1064         if not testbed_guid in self._cross_data:
1065             return cross_data
1066
1067         # fetch attribute lists in one batch
1068         attribute_lists = dict()
1069         for cross_testbed_guid, guid_list in \
1070                 self._cross_data[testbed_guid].iteritems():
1071             cross_testbed = self._testbeds[cross_testbed_guid]
1072             for cross_guid in guid_list:
1073                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1074                     cross_testbed.get_attribute_list_deferred(cross_guid)
1075
1076         # fetch attribute values in another batch
1077         for cross_testbed_guid, guid_list in \
1078                 self._cross_data[testbed_guid].iteritems():
1079             cross_data[cross_testbed_guid] = dict()
1080             cross_testbed = self._testbeds[cross_testbed_guid]
1081             for cross_guid in guid_list:
1082                 elem_cross_data = dict(
1083                     _guid = cross_guid,
1084                     _testbed_guid = cross_testbed_guid,
1085                     _testbed_id = cross_testbed.testbed_id,
1086                     _testbed_version = cross_testbed.testbed_version)
1087                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1088                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1089                 for attr_name in attribute_list:
1090                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1091                     elem_cross_data[attr_name] = attr_value
1092         
1093         # undefer all values - we'll have to serialize them probably later
1094         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1095             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1096                 for attr_name, attr_value in elem_cross_data.iteritems():
1097                     elem_cross_data[attr_name] = _undefer(attr_value)
1098         
1099         return cross_data
1100     """
1101 class ExperimentSuite(object):
1102     def __init__(self, experiment_xml, access_config, repetitions,
1103             duration, wait_guids):
1104         self._experiment_xml = experiment_xml
1105         self._access_config = access_config
1106         self._experiments = dict()
1107         self._repetitions = repetitions
1108         self._duration = duration
1109         self._wait_guids = wait_guids
1110         self._current = None
1111         self._status = TS.STATUS_ZERO
1112         self._thread = None
1113
1114     def start(self):
1115         self._status  = TS.STATUS_STARTED
1116         self._thread = threading.Thread(target = self._run_experiment_suite)
1117         self._thread.start()
1118
1119     def shutdown(self):
1120         if self._thread:
1121             self._thread.join()
1122             self._thread = None
1123
1124     def _run_experiment_suite(self):
1125         for i in xrange[0, self.repetitions]:
1126             self._current = i
1127             self._run_one_experiment()
1128
1129     def _run_one_experiment(self):
1130         access_config = proxy.AccessConfiguration()
1131         for attr in self._access_config.attributes:
1132             access_config.set_attribute_value(attr.name, attr.value)
1133         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1134         root_dir = "%s_%d" % (
1135                 access_config.get_attribute_value(DC.ROOT_DIRECTORY), 
1136                 self._current)
1137         access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1138         controller = proxy.create_experiment_controller(self._experiment_xml,
1139                 access_config)
1140         self._experiments[self._current] = controller
1141         controller.start()
1142         started_at = time.time()
1143         # wait until all specified guids have finished execution
1144         if self._wait_guids:
1145             while all(itertools.imap(controller.is_finished, self._wait_guids):
1146                 time.sleep(0.5)
1147         # wait until the minimum experiment duration time has elapsed 
1148         if self._duration:
1149             while (time.time() - started_at) < self._duration:
1150                 time.sleep(0.5)
1151         controller.stop()
1152         #download results!!
1153         controller.shutdown()
1154     """
1155
1156
1157
1158
1159
1160
1161