Attempt at fixing NS3 in PL:
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import validation
6 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13 import collections
14 import functools
15 import time
16 import logging
17 logging.basicConfig()
18
19 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
20 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
21 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
22
23 def _undefer(deferred):
24     if hasattr(deferred, '_get'):
25         return deferred._get()
26     else:
27         return deferred
28
29
30 class TestbedController(object):
31     def __init__(self, testbed_id, testbed_version):
32         self._testbed_id = testbed_id
33         self._testbed_version = testbed_version
34
35     @property
36     def testbed_id(self):
37         return self._testbed_id
38
39     @property
40     def testbed_version(self):
41         return self._testbed_version
42
43     @property
44     def guids(self):
45         raise NotImplementedError
46
47     def defer_configure(self, name, value):
48         """Instructs setting a configuartion attribute for the testbed instance"""
49         raise NotImplementedError
50
51     def defer_create(self, guid, factory_id):
52         """Instructs creation of element """
53         raise NotImplementedError
54
55     def defer_create_set(self, guid, name, value):
56         """Instructs setting an initial attribute on an element"""
57         raise NotImplementedError
58
59     def defer_factory_set(self, guid, name, value):
60         """Instructs setting an attribute on a factory"""
61         raise NotImplementedError
62
63     def defer_connect(self, guid1, connector_type_name1, guid2, 
64             connector_type_name2): 
65         """Instructs creation of a connection between the given connectors"""
66         raise NotImplementedError
67
68     def defer_cross_connect(self, 
69             guid, connector_type_name,
70             cross_guid, cross_testbed_guid,
71             cross_testbed_id, cross_factory_id,
72             cross_connector_type_name):
73         """
74         Instructs creation of a connection between the given connectors 
75         of different testbed instances
76         """
77         raise NotImplementedError
78
79     def defer_add_trace(self, guid, trace_id):
80         """Instructs the addition of a trace"""
81         raise NotImplementedError
82
83     def defer_add_address(self, guid, address, netprefix, broadcast): 
84         """Instructs the addition of an address"""
85         raise NotImplementedError
86
87     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
88         """Instructs the addition of a route"""
89         raise NotImplementedError
90
91     def do_setup(self):
92         """After do_setup the testbed initial configuration is done"""
93         raise NotImplementedError
94
95     def do_create(self):
96         """
97         After do_create all instructed elements are created and 
98         attributes setted
99         """
100         raise NotImplementedError
101
102     def do_connect_init(self):
103         """
104         After do_connect_init all internal connections between testbed elements
105         are initiated
106         """
107         raise NotImplementedError
108
109     def do_connect_compl(self):
110         """
111         After do_connect all internal connections between testbed elements
112         are completed
113         """
114         raise NotImplementedError
115
116     def do_preconfigure(self):
117         """
118         Done just before resolving netrefs, after connection, before cross connections,
119         useful for early stages of configuration, for setting up stuff that might be
120         required for netref resolution.
121         """
122         raise NotImplementedError
123
124     def do_configure(self):
125         """After do_configure elements are configured"""
126         raise NotImplementedError
127
128     def do_prestart(self):
129         """Before do_start elements are prestart-configured"""
130         raise NotImplementedError
131
132     def do_cross_connect_init(self, cross_data):
133         """
134         After do_cross_connect_init initiation of all external connections 
135         between different testbed elements is performed
136         """
137         raise NotImplementedError
138
139     def do_cross_connect_compl(self, cross_data):
140         """
141         After do_cross_connect_compl completion of all external connections 
142         between different testbed elements is performed
143         """
144         raise NotImplementedError
145
146     def start(self):
147         raise NotImplementedError
148
149     def stop(self):
150         raise NotImplementedError
151
152     def recover(self):
153         """
154         On testbed recovery (if recovery is a supported policy), the controller
155         instance will be re-created and the following sequence invoked:
156         
157             do_setup
158             defer_X - programming the testbed with persisted execution values
159                 (not design values). Execution values (ExecImmutable attributes)
160                 should be enough to recreate the testbed's state.
161             *recover*
162             <cross-connection methods>
163             
164         Start will not be called, and after cross connection invocations,
165         the testbed is supposed to be fully functional again.
166         """
167         raise NotImplementedError
168
169     def set(self, guid, name, value, time = TIME_NOW):
170         raise NotImplementedError
171
172     def get(self, guid, name, time = TIME_NOW):
173         raise NotImplementedError
174     
175     def get_route(self, guid, index, attribute):
176         """
177         Params:
178             
179             guid: guid of box to query
180             index: number of routing entry to fetch
181             attribute: one of Destination, NextHop, NetPrefix
182         """
183         raise NotImplementedError
184
185     def get_address(self, guid, index, attribute='Address'):
186         """
187         Params:
188             
189             guid: guid of box to query
190             index: number of inteface to select
191             attribute: one of Address, NetPrefix, Broadcast
192         """
193         raise NotImplementedError
194
195     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
196         raise NotImplementedError
197
198     def get_factory_id(self, guid):
199         raise NotImplementedError
200
201     def action(self, time, guid, action):
202         raise NotImplementedError
203
204     def status(self, guid):
205         raise NotImplementedError
206     
207     def testbed_status(self):
208         raise NotImplementedError
209
210     def trace(self, guid, trace_id, attribute='value'):
211         raise NotImplementedError
212
213     def traces_info(self):
214         """ dictionary of dictionaries:
215             traces_info = dict({
216                 guid = dict({
217                     trace_id = dict({
218                             host = host,
219                             filepath = filepath,
220                             filesize = size in bytes,
221                         })
222                 })
223             })"""
224         raise NotImplementedError
225
226     def shutdown(self):
227         raise NotImplementedError
228
229 class ExperimentController(object):
230     def __init__(self, experiment_xml, root_dir):
231         self._experiment_design_xml = experiment_xml
232         self._experiment_execute_xml = None
233         self._testbeds = dict()
234         self._deployment_config = dict()
235         self._netrefs = collections.defaultdict(set)
236         self._testbed_netrefs = collections.defaultdict(set)
237         self._cross_data = dict()
238         self._root_dir = root_dir
239         self._netreffed_testbeds = set()
240         self._guids_in_testbed_cache = dict()
241         self._failed_testbeds = set()
242         self._started_time = None
243         self._stopped_time = None
244         self._testbed_order = []
245       
246         self._logger = logging.getLogger('nepi.core.execute')
247         level = logging.ERROR
248         if os.environ.get("NEPI_CONTROLLER_LOGLEVEL", 
249                 DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
250             level = logging.DEBUG
251         self._logger.setLevel(level)
252  
253         if experiment_xml is None and root_dir is not None:
254             # Recover
255             self.load_experiment_xml()
256             self.load_execute_xml()
257         else:
258             self.persist_experiment_xml()
259
260     @property
261     def experiment_design_xml(self):
262         return self._experiment_design_xml
263
264     @property
265     def experiment_execute_xml(self):
266         return self._experiment_execute_xml
267
268     @property
269     def started_time(self):
270         return self._started_time
271
272     @property
273     def stopped_time(self):
274         return self._stopped_time
275
276     @property
277     def guids(self):
278         guids = list()
279         for testbed_guid in self._testbeds.keys():
280             _guids = self._guids_in_testbed(testbed_guid)
281             if _guids:
282                 guids.extend(_guids)
283         return guids
284
285     def persist_experiment_xml(self):
286         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
287         f = open(xml_path, "w")
288         f.write(self._experiment_design_xml)
289         f.close()
290
291     def persist_execute_xml(self):
292         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
293         f = open(xml_path, "w")
294         f.write(self._experiment_execute_xml)
295         f.close()
296
297     def load_experiment_xml(self):
298         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
299         f = open(xml_path, "r")
300         self._experiment_design_xml = f.read()
301         f.close()
302
303     def load_execute_xml(self):
304         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
305         f = open(xml_path, "r")
306         self._experiment_execute_xml = f.read()
307         f.close()
308
309     def trace(self, guid, trace_id, attribute='value'):
310         testbed = self._testbed_for_guid(guid)
311         if testbed != None:
312             return testbed.trace(guid, trace_id, attribute)
313         raise RuntimeError("No element exists with guid %d" % guid)    
314
315     def traces_info(self):
316         traces_info = dict()
317         for guid, testbed in self._testbeds.iteritems():
318             tinfo = testbed.traces_info()
319             if tinfo:
320                 traces_info[guid] = testbed.traces_info()
321         return traces_info
322
323     @staticmethod
324     def _parallel(callables):
325         excs = []
326         def wrap(callable):
327             def wrapped(*p, **kw):
328                 try:
329                     callable(*p, **kw)
330                 except:
331                     logging.exception("Exception occurred in asynchronous thread:")
332                     excs.append(sys.exc_info())
333             try:
334                 wrapped = functools.wraps(callable)(wrapped)
335             except:
336                 # functools.partial not wrappable
337                 pass
338             return wrapped
339         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
340         for thread in threads:
341             thread.start()
342         for thread in threads:
343             thread.join()
344         for exc in excs:
345             eTyp, eVal, eLoc = exc
346             raise eTyp, eVal, eLoc
347
348     def start(self):
349         self._started_time = time.time() 
350         self._start()
351
352     def _start(self, recover = False):
353         parser = XmlExperimentParser()
354         
355         if recover:
356             xml = self._experiment_execute_xml
357         else:
358             xml = self._experiment_design_xml
359         data = parser.from_xml_to_data(xml)
360
361         # instantiate testbed controllers
362         to_recover, to_restart = self._init_testbed_controllers(data, recover)
363         all_restart = set(to_restart)
364         
365         if not recover:
366             # persist testbed connection data, for potential recovery
367             self._persist_testbed_proxies()
368         else:
369             # recover recoverable controllers
370             for guid in to_recover:
371                 try:
372                     self._testbeds[guid].do_setup()
373                     self._testbeds[guid].recover()
374                 except:
375                     self._logger.exception("During recovery of testbed %s", guid)
376                     
377                     # Mark failed
378                     self._failed_testbeds.add(guid)
379     
380         def steps_to_configure(self, allowed_guids):
381             # perform setup in parallel for all test beds,
382             # wait for all threads to finish
383
384             self._logger.debug("ExperimentController: Starting parallel do_setup")
385             self._parallel([testbed.do_setup 
386                             for guid,testbed in self._testbeds.iteritems()
387                             if guid in allowed_guids])
388        
389             # perform create-connect in parallel, wait
390             # (internal connections only)
391             self._logger.debug("ExperimentController: Starting parallel do_create")
392             self._parallel([testbed.do_create
393                             for guid,testbed in self._testbeds.iteritems()
394                             if guid in allowed_guids])
395
396             self._logger.debug("ExperimentController: Starting parallel do_connect_init")
397             self._parallel([testbed.do_connect_init
398                             for guid,testbed in self._testbeds.iteritems()
399                             if guid in allowed_guids])
400
401             self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
402             self._parallel([testbed.do_connect_compl
403                             for guid,testbed in self._testbeds.iteritems()
404                             if guid in allowed_guids])
405
406             self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
407             self._parallel([testbed.do_preconfigure
408                             for guid,testbed in self._testbeds.iteritems()
409                             if guid in allowed_guids])
410             self._clear_caches()
411             
412             # Store testbed order
413             self._testbed_order.append(allowed_guids)
414
415         steps_to_configure(self, to_restart)
416
417         if self._netreffed_testbeds:
418             self._logger.debug("ExperimentController: Resolving netreffed testbeds")
419             # initally resolve netrefs
420             self.do_netrefs(data, fail_if_undefined=False)
421             
422             # rinse and repeat, for netreffed testbeds
423             netreffed_testbeds = set(self._netreffed_testbeds)
424
425             to_recover, to_restart = self._init_testbed_controllers(data, recover)
426             all_restart.update(to_restart)
427             
428             if not recover:
429                 # persist testbed connection data, for potential recovery
430                 self._persist_testbed_proxies()
431             else:
432                 # recover recoverable controllers
433                 for guid in to_recover:
434                     try:
435                         self._testbeds[guid].do_setup()
436                         self._testbeds[guid].recover()
437                     except:
438                         self._logger.exception("During recovery of testbed %s", guid)
439
440                         # Mark failed
441                         self._failed_testbeds.add(guid)
442
443             # configure dependant testbeds
444             steps_to_configure(self, to_restart)
445         
446         all_restart = [ self._testbeds[guid] for guid in all_restart ]
447             
448         # final netref step, fail if anything's left unresolved
449         self._logger.debug("ExperimentController: Resolving do_netrefs")
450         self.do_netrefs(data, fail_if_undefined=False)
451        
452         # Only now, that netref dependencies have been solve, it is safe to
453         # program cross_connections
454         self._logger.debug("ExperimentController: Programming testbed cross-connections")
455         self._program_testbed_cross_connections(data)
456  
457         # perform do_configure in parallel for al testbeds
458         # (it's internal configuration for each)
459         self._logger.debug("ExperimentController: Starting parallel do_configure")
460         self._parallel([testbed.do_configure
461                         for testbed in all_restart])
462
463         self._clear_caches()
464
465         #print >>sys.stderr, "DO IT"
466         #import time
467         #time.sleep(60)
468         
469         # cross-connect (cannot be done in parallel)
470         self._logger.debug("ExperimentController: Starting cross-connect")
471         for guid, testbed in self._testbeds.iteritems():
472             cross_data = self._get_cross_data(guid)
473             testbed.do_cross_connect_init(cross_data)
474         for guid, testbed in self._testbeds.iteritems():
475             cross_data = self._get_cross_data(guid)
476             testbed.do_cross_connect_compl(cross_data)
477        
478         self._clear_caches()
479
480         # Last chance to configure (parallel on all testbeds)
481         self._logger.debug("ExperimentController: Starting parallel do_prestart")
482         self._parallel([testbed.do_prestart
483                         for testbed in all_restart])
484
485         # final netref step, fail if anything's left unresolved
486         self.do_netrefs(data, fail_if_undefined=True)
487  
488         self._clear_caches()
489         
490         if not recover:
491             # update execution xml with execution-specific values
492             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
493             self._update_execute_xml()
494             self.persist_execute_xml()
495
496         # start experiment (parallel start on all testbeds)
497         self._logger.debug("ExperimentController: Starting parallel do_start")
498         self._parallel([testbed.start
499                         for testbed in all_restart])
500
501         self._clear_caches()
502
503     def _clear_caches(self):
504         # Cleaning cache for safety.
505         self._guids_in_testbed_cache = dict()
506
507     def _persist_testbed_proxies(self):
508         TRANSIENT = (DC.RECOVER,)
509         
510         # persist access configuration for all testbeds, so that
511         # recovery mode can reconnect to them if it becomes necessary
512         conf = ConfigParser.RawConfigParser()
513         for testbed_guid, testbed_config in self._deployment_config.iteritems():
514             testbed_guid = str(testbed_guid)
515             conf.add_section(testbed_guid)
516             for attr in testbed_config.get_attribute_list():
517                 if attr not in TRANSIENT:
518                     value = testbed_config.get_attribute_value(attr)
519                     if value is not None:
520                         conf.set(testbed_guid, attr, value)
521         
522         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
523         conf.write(f)
524         f.close()
525     
526     def _load_testbed_proxies(self):
527         TYPEMAP = {
528             Attribute.STRING : 'get',
529             Attribute.BOOL : 'getboolean',
530             Attribute.ENUM : 'get',
531             Attribute.DOUBLE : 'getfloat',
532             Attribute.INTEGER : 'getint',
533         }
534         
535         TRANSIENT = (DC.RECOVER,)
536         
537         # deferred import because proxy needs
538         # our class definitions to define proxies
539         import nepi.util.proxy as proxy
540         
541         conf = ConfigParser.RawConfigParser()
542         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
543         for testbed_guid in conf.sections():
544             testbed_config = proxy.AccessConfiguration()
545             testbed_guid = str(testbed_guid)
546             for attr in testbed_config.get_attribute_list():
547                 if attr not in TRANSIENT:
548                     getter = getattr(conf, TYPEMAP.get(
549                         testbed_config.get_attribute_type(attr),
550                         'get') )
551                     try:
552                         value = getter(testbed_guid, attr)
553                         testbed_config.set_attribute_value(attr, value)
554                     except ConfigParser.NoOptionError:
555                         # Leave default
556                         pass
557     
558     def _unpersist_testbed_proxies(self):
559         try:
560             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
561         except:
562             # Just print exceptions, this is just cleanup
563             self._logger.exception("Loading testbed configuration")
564
565     def _update_execute_xml(self):
566         # For all testbeds,
567         #   For all elements in testbed,
568         #       - gather immutable execute-readable attribuets lists
569         #         asynchronously
570         # Generate new design description from design xml
571         # (Wait for attributes lists - implicit syncpoint)
572         # For all testbeds,
573         #   For all elements in testbed,
574         #       - gather all immutable execute-readable attribute
575         #         values, asynchronously
576         # (Wait for attribute values - implicit syncpoint)
577         # For all testbeds,
578         #   For all elements in testbed,
579         #       - inject non-None values into new design
580         # Generate execute xml from new design
581
582         attribute_lists = dict(
583             (testbed_guid, collections.defaultdict(dict))
584             for testbed_guid in self._testbeds
585         )
586         
587         for testbed_guid, testbed in self._testbeds.iteritems():
588             guids = self._guids_in_testbed(testbed_guid)
589             for guid in guids:
590                 attribute_lists[testbed_guid][guid] = \
591                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
592         
593         parser = XmlExperimentParser()
594         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
595
596         attribute_values = dict(
597             (testbed_guid, collections.defaultdict(dict))
598             for testbed_guid in self._testbeds
599         )
600         
601         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
602             testbed = self._testbeds[testbed_guid]
603             for guid, attribute_list in testbed_attribute_lists.iteritems():
604                 attribute_list = _undefer(attribute_list)
605                 attribute_values[testbed_guid][guid] = dict(
606                     (attribute, testbed.get_deferred(guid, attribute))
607                     for attribute in attribute_list
608                 )
609         
610         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
611             for guid, attribute_values in testbed_attribute_values.iteritems():
612                 for attribute, value in attribute_values.iteritems():
613                     value = _undefer(value)
614                     if value is not None:
615                         execute_data.add_attribute_data(guid, attribute, value)
616         
617         self._experiment_execute_xml = parser.to_xml(data=execute_data)
618
619     def stop(self):
620        for testbed in self._testbeds.values():
621            testbed.stop()
622        self._unpersist_testbed_proxies()
623        self._stopped_time = time.time() 
624    
625     def recover(self):
626         # reload perviously persisted testbed access configurations
627         self._failed_testbeds.clear()
628         self._load_testbed_proxies()
629
630         # re-program testbeds that need recovery
631         self._start(recover = True)
632
633     def is_finished(self, guid):
634         testbed = self._testbed_for_guid(guid)
635         if testbed != None:
636             return testbed.status(guid) == AS.STATUS_FINISHED
637         raise RuntimeError("No element exists with guid %d" % guid)    
638     
639     def _testbed_recovery_policy(self, guid, data = None):
640         if data is None:
641             parser = XmlExperimentParser()
642             data = parser.from_xml_to_data(self._experiment_design_xml)
643         
644         return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
645
646     def status(self, guid):
647         if guid in self._testbeds:
648             # guid is a testbed
649             # report testbed status
650             if guid in self._failed_testbeds:
651                 return TS.STATUS_FAILED
652             else:
653                 try:
654                     return self._testbeds[guid].status()
655                 except:
656                     return TS.STATUS_UNRESPONSIVE
657         else:
658             # guid is an element
659             testbed = self._testbed_for_guid(guid)
660             if testbed is not None:
661                 return testbed.status(guid)
662             else:
663                 return AS.STATUS_UNDETERMINED
664
665     def set(self, guid, name, value, time = TIME_NOW):
666         testbed = self._testbed_for_guid(guid)
667         if testbed != None:
668             testbed.set(guid, name, value, time)
669         else:
670             raise RuntimeError("No element exists with guid %d" % guid)    
671
672     def get(self, guid, name, time = TIME_NOW):
673         testbed = self._testbed_for_guid(guid)
674         if testbed != None:
675             return testbed.get(guid, name, time)
676         raise RuntimeError("No element exists with guid %d" % guid)    
677
678     def get_deferred(self, guid, name, time = TIME_NOW):
679         testbed = self._testbed_for_guid(guid)
680         if testbed != None:
681             return testbed.get_deferred(guid, name, time)
682         raise RuntimeError("No element exists with guid %d" % guid)    
683
684     def get_factory_id(self, guid):
685         testbed = self._testbed_for_guid(guid)
686         if testbed != None:
687             return testbed.get_factory_id(guid)
688         raise RuntimeError("No element exists with guid %d" % guid)    
689
690     def get_testbed_id(self, guid):
691         testbed = self._testbed_for_guid(guid)
692         if testbed != None:
693             return testbed.testbed_id
694         raise RuntimeError("No element exists with guid %d" % guid)    
695
696     def get_testbed_version(self, guid):
697         testbed = self._testbed_for_guid(guid)
698         if testbed != None:
699             return testbed.testbed_version
700         raise RuntimeError("No element exists with guid %d" % guid)    
701
702     def shutdown(self):
703         exceptions = list()
704         ordered_testbeds = set()
705
706         def shutdown_testbed(guid):
707             try:
708                 testbed = self._testbeds[guid]
709                 ordered_testbeds.add(guid)
710                 testbed.shutdown()
711             except:
712                 exceptions.append(sys.exc_info())
713                 
714         self._logger.debug("ExperimentController: Starting parallel shutdown")
715         
716         for testbed_guids in reversed(self._testbed_order):
717             self._logger.debug("ExperimentController: Shutting down %r", testbed_guids)
718             self._parallel([functools.partial(shutdown_testbed, guid)
719                             for guid in set(testbed_guids) - ordered_testbeds])
720         remaining_guids = set(self._testbeds) - ordered_testbeds
721         if remaining_guids:
722             self._logger.debug("ExperimentController: Shutted down %r", ordered_testbeds)
723             self._logger.debug("ExperimentController: Shutting down %r", remaining_guids)
724             self._parallel([functools.partial(shutdown_testbed, guid)
725                             for guid in remaining_guids])
726             
727         for exc_info in exceptions:
728             raise exc_info[0], exc_info[1], exc_info[2]
729
730     def _testbed_for_guid(self, guid):
731         for testbed_guid in self._testbeds.keys():
732             if guid in self._guids_in_testbed(testbed_guid):
733                 if testbed_guid in self._failed_testbeds:
734                     return None
735                 return self._testbeds[testbed_guid]
736         return None
737
738     def _guids_in_testbed(self, testbed_guid):
739         if testbed_guid not in self._testbeds:
740             return set()
741         if testbed_guid not in self._guids_in_testbed_cache:
742             self._guids_in_testbed_cache[testbed_guid] = \
743                 set(self._testbeds[testbed_guid].guids)
744         return self._guids_in_testbed_cache[testbed_guid]
745
746     @staticmethod
747     def _netref_component_split(component):
748         match = COMPONENT_PATTERN.match(component)
749         if match:
750             return match.group("kind"), match.group("index")
751         else:
752             return component, None
753
754     _NETREF_COMPONENT_GETTERS = {
755         'addr':
756             lambda testbed, guid, index, name: 
757                 testbed.get_address(guid, int(index), name),
758         'route' :
759             lambda testbed, guid, index, name: 
760                 testbed.get_route(guid, int(index), name),
761         'trace' :
762             lambda testbed, guid, index, name: 
763                 testbed.trace(guid, index, attribute = name),
764         '' : 
765             lambda testbed, guid, index, name: 
766                 testbed.get(guid, name),
767     }
768     
769     def resolve_netref_value(self, value, failval = None):
770         rv = failval
771         while True:
772             for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
773                 label = match.group("label")
774                 if label.startswith('GUID-'):
775                     ref_guid = int(label[5:])
776                     if ref_guid:
777                         expr = match.group("expr")
778                         component = (match.group("component") or "")[1:] # skip the dot
779                         attribute = match.group("attribute")
780                         
781                         # split compound components into component kind and index
782                         # eg: 'addr[0]' -> ('addr', '0')
783                         component, component_index = self._netref_component_split(component)
784
785                         # find object and resolve expression
786                         for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
787                             if component not in self._NETREF_COMPONENT_GETTERS:
788                                 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
789                             elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
790                                 pass
791                             else:
792                                 ref_value = self._NETREF_COMPONENT_GETTERS[component](
793                                     ref_testbed, ref_guid, component_index, attribute)
794                                 if ref_value:
795                                     value = rv = value.replace(match.group(), ref_value)
796                                     break
797                         else:
798                             # unresolvable netref
799                             return failval
800                         break
801             else:
802                 break
803         return rv
804     
805     def do_netrefs(self, data, fail_if_undefined = False):
806         # element netrefs
807         for (testbed_guid, guid), attrs in self._netrefs.items():
808             testbed = self._testbeds.get(testbed_guid)
809             if testbed is not None:
810                 for name in set(attrs):
811                     value = testbed.get(guid, name)
812                     if isinstance(value, basestring):
813                         ref_value = self.resolve_netref_value(value)
814                         if ref_value is not None:
815                             testbed.set(guid, name, ref_value)
816                             attrs.remove(name)
817                         elif fail_if_undefined:
818                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
819                 if not attrs:
820                     del self._netrefs[(testbed_guid, guid)]
821         
822         # testbed netrefs
823         for testbed_guid, attrs in self._testbed_netrefs.items():
824             tb_data = dict(data.get_attribute_data(testbed_guid))
825             if data:
826                 for name in set(attrs):
827                     value = tb_data.get(name)
828                     if isinstance(value, basestring):
829                         ref_value = self.resolve_netref_value(value)
830                         if ref_value is not None:
831                             data.set_attribute_data(testbed_guid, name, ref_value)
832                             attrs.remove(name)
833                         elif fail_if_undefined:
834                             raise ValueError, "Unresolvable netref in: %r" % (value,)
835                 if not attrs:
836                     del self._testbed_netrefs[testbed_guid]
837         
838
839     def _init_testbed_controllers(self, data, recover = False):
840         blacklist_testbeds = set(self._testbeds)
841         element_guids = list()
842         label_guids = dict()
843         data_guids = data.guids
844         to_recover = set()
845         to_restart = set()
846
847         # gather label associations
848         for guid in data_guids:
849             if not data.is_testbed_data(guid):
850                 (testbed_guid, factory_id) = data.get_box_data(guid)
851                 label = data.get_attribute_data(guid, "label")
852                 if label is not None:
853                     if label in label_guids:
854                         raise RuntimeError, "Label %r is not unique" % (label,)
855                     label_guids[label] = guid
856
857         # create testbed controllers
858         for guid in data_guids:
859             if data.is_testbed_data(guid):
860                 if guid not in self._testbeds:
861                     try:
862                         self._create_testbed_controller(
863                             guid, data, element_guids, recover)
864                         if recover:
865                             # Already programmed
866                             blacklist_testbeds.add(guid)
867                         else:
868                             to_restart.add(guid)
869                     except:
870                         if recover:
871                             policy = self._testbed_recovery_policy(guid, data=data)
872                             if policy == DC.POLICY_RECOVER:
873                                 self._create_testbed_controller(
874                                     guid, data, element_guids, False)
875                                 to_recover.add(guid)
876                             elif policy == DC.POLICY_RESTART:
877                                 self._create_testbed_controller(
878                                     guid, data, element_guids, False)
879                                 to_restart.add(guid)
880                             else:
881                                 # Mark failed
882                                 self._failed_testbeds.add(guid)
883                         else:
884                             raise
885         
886         # queue programmable elements
887         #  - that have not been programmed already (blacklist_testbeds)
888         #  - including recovered or restarted testbeds
889         #  - but those that have no unresolved netrefs
890         for guid in data_guids:
891             if not data.is_testbed_data(guid):
892                 (testbed_guid, factory_id) = data.get_box_data(guid)
893                 if testbed_guid not in blacklist_testbeds:
894                     element_guids.append(guid)
895
896         # replace references to elements labels for its guid
897         self._resolve_labels(data, data_guids, label_guids)
898     
899         # program testbed controllers
900         if element_guids:
901             self._program_testbed_controllers(element_guids, data)
902         
903         return to_recover, to_restart
904
905     def _resolve_labels(self, data, data_guids, label_guids):
906         netrefs = self._netrefs
907         testbed_netrefs = self._testbed_netrefs
908         for guid in data_guids:
909             for name, value in data.get_attribute_data(guid):
910                 if isinstance(value, basestring):
911                     while True:
912                         for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
913                             label = match.group("label")
914                             if not label.startswith('GUID-'):
915                                 ref_guid = label_guids.get(label)
916                                 if ref_guid is not None:
917                                     value = value.replace(
918                                         match.group(),
919                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
920                                             guid = 'GUID-%d' % (ref_guid,),
921                                             expr = match.group("expr"),
922                                             label = label)
923                                     )
924                                     data.set_attribute_data(guid, name, value)
925                                     
926                                     # memorize which guid-attribute pairs require
927                                     # postprocessing, to avoid excessive controller-testbed
928                                     # communication at configuration time
929                                     # (which could require high-latency network I/O)
930                                     if not data.is_testbed_data(guid):
931                                         (testbed_guid, factory_id) = data.get_box_data(guid)
932                                         netrefs[(testbed_guid, guid)].add(name)
933                                     else:
934                                         testbed_netrefs[guid].add(name)
935                                     
936                                     break
937                         else:
938                             break
939
940     def _create_testbed_controller(self, guid, data, element_guids, recover):
941         (testbed_id, testbed_version) = data.get_testbed_data(guid)
942         deployment_config = self._deployment_config.get(guid)
943         
944         # deferred import because proxy needs
945         # our class definitions to define proxies
946         import nepi.util.proxy as proxy
947         
948         if deployment_config is None:
949             # need to create one
950             deployment_config = proxy.AccessConfiguration()
951             
952             for (name, value) in data.get_attribute_data(guid):
953                 if value is not None and deployment_config.has_attribute(name):
954                     # if any deployment config attribute has a netref, we can't
955                     # create this controller yet
956                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
957                         # remember to re-issue this one
958                         self._netreffed_testbeds.add(guid)
959                         return
960                     
961                     # copy deployment config attribute
962                     deployment_config.set_attribute_value(name, value)
963             
964             # commit config
965             self._deployment_config[guid] = deployment_config
966         
967         if deployment_config is not None:
968             # force recovery mode 
969             deployment_config.set_attribute_value("recover",recover)
970         
971         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
972                 deployment_config)
973         for (name, value) in data.get_attribute_data(guid):
974             testbed.defer_configure(name, value)
975         self._testbeds[guid] = testbed
976         if guid in self._netreffed_testbeds:
977             self._netreffed_testbeds.remove(guid)
978
979     def _program_testbed_controllers(self, element_guids, data):
980         def resolve_create_netref(data, guid, name, value): 
981             # Try to resolve create-time netrefs, if possible
982             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
983                 try:
984                     nuvalue = self.resolve_netref_value(value)
985                 except:
986                     # Any trouble means we're not in shape to resolve the netref yet
987                     nuvalue = None
988                 if nuvalue is not None:
989                     # Only if we succeed we remove the netref deferral entry
990                     value = nuvalue
991                     data.set_attribute_data(guid, name, value)
992                     if (testbed_guid, guid) in self._netrefs:
993                         self._netrefs[(testbed_guid, guid)].discard(name)
994             return value
995
996         for guid in element_guids:
997             (testbed_guid, factory_id) = data.get_box_data(guid)
998             testbed = self._testbeds.get(testbed_guid)
999             if testbed is not None:
1000                 # create
1001                 testbed.defer_create(guid, factory_id)
1002                 # set attributes
1003                 for (name, value) in data.get_attribute_data(guid):
1004                     value = resolve_create_netref(data, guid, name, value)
1005                     testbed.defer_create_set(guid, name, value)
1006
1007         for guid in element_guids:
1008             (testbed_guid, factory_id) = data.get_box_data(guid)
1009             testbed = self._testbeds.get(testbed_guid)
1010             if testbed is not None:
1011                 # traces
1012                 for trace_id in data.get_trace_data(guid):
1013                     testbed.defer_add_trace(guid, trace_id)
1014                 # addresses
1015                 for (address, netprefix, broadcast) in data.get_address_data(guid):
1016                     if address != None:
1017                         testbed.defer_add_address(guid, address, netprefix, 
1018                                 broadcast)
1019                 # routes
1020                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
1021                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
1022                 # store connections data
1023                 for (connector_type_name, other_guid, other_connector_type_name) \
1024                         in data.get_connection_data(guid):
1025                     (other_testbed_guid, other_factory_id) = data.get_box_data(
1026                             other_guid)
1027                     if testbed_guid == other_testbed_guid:
1028                         # each testbed should take care of enforcing internal
1029                         # connection simmetry, so each connection is only
1030                         # added in one direction
1031                         testbed.defer_connect(guid, connector_type_name, 
1032                                 other_guid, other_connector_type_name)
1033
1034     def _program_testbed_cross_connections(self, data):
1035         data_guids = data.guids
1036         for guid in data_guids: 
1037             if not data.is_testbed_data(guid):
1038                 (testbed_guid, factory_id) = data.get_box_data(guid)
1039                 testbed = self._testbeds.get(testbed_guid)
1040                 if testbed is not None:
1041                     for (connector_type_name, cross_guid, cross_connector_type_name) \
1042                             in data.get_connection_data(guid):
1043                         (testbed_guid, factory_id) = data.get_box_data(guid)
1044                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
1045                                 cross_guid)
1046                         if testbed_guid != cross_testbed_guid:
1047                             cross_testbed = self._testbeds[cross_testbed_guid]
1048                             cross_testbed_id = cross_testbed.testbed_id
1049                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
1050                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
1051                                     cross_connector_type_name)
1052                             # save cross data for later
1053                             self._logger.debug("ExperimentController: adding cross_connection data tbd=%d:guid=%d - tbd=%d:guid=%d" % \
1054                                     (testbed_guid, guid, cross_testbed_guid, cross_guid))
1055                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1056                                     cross_guid)
1057
1058     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1059         if testbed_guid not in self._cross_data:
1060             self._cross_data[testbed_guid] = dict()
1061         if cross_testbed_guid not in self._cross_data[testbed_guid]:
1062             self._cross_data[testbed_guid][cross_testbed_guid] = set()
1063         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1064
1065     def _get_cross_data(self, testbed_guid):
1066         cross_data = dict()
1067         if not testbed_guid in self._cross_data:
1068             return cross_data
1069
1070         # fetch attribute lists in one batch
1071         attribute_lists = dict()
1072         for cross_testbed_guid, guid_list in \
1073                 self._cross_data[testbed_guid].iteritems():
1074             cross_testbed = self._testbeds[cross_testbed_guid]
1075             for cross_guid in guid_list:
1076                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1077                     cross_testbed.get_attribute_list_deferred(cross_guid)
1078
1079         # fetch attribute values in another batch
1080         for cross_testbed_guid, guid_list in \
1081                 self._cross_data[testbed_guid].iteritems():
1082             cross_data[cross_testbed_guid] = dict()
1083             cross_testbed = self._testbeds[cross_testbed_guid]
1084             for cross_guid in guid_list:
1085                 elem_cross_data = dict(
1086                     _guid = cross_guid,
1087                     _testbed_guid = cross_testbed_guid,
1088                     _testbed_id = cross_testbed.testbed_id,
1089                     _testbed_version = cross_testbed.testbed_version)
1090                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1091                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1092                 for attr_name in attribute_list:
1093                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1094                     elem_cross_data[attr_name] = attr_value
1095         
1096         # undefer all values - we'll have to serialize them probably later
1097         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1098             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1099                 for attr_name, attr_value in elem_cross_data.iteritems():
1100                     elem_cross_data[attr_name] = _undefer(attr_value)
1101         
1102         return cross_data
1103     """
1104 class ExperimentSuite(object):
1105     def __init__(self, experiment_xml, access_config, repetitions,
1106             duration, wait_guids):
1107         self._experiment_xml = experiment_xml
1108         self._access_config = access_config
1109         self._experiments = dict()
1110         self._repetitions = repetitions
1111         self._duration = duration
1112         self._wait_guids = wait_guids
1113         self._current = None
1114         self._status = TS.STATUS_ZERO
1115         self._thread = None
1116
1117     def start(self):
1118         self._status  = TS.STATUS_STARTED
1119         self._thread = threading.Thread(target = self._run_experiment_suite)
1120         self._thread.start()
1121
1122     def shutdown(self):
1123         if self._thread:
1124             self._thread.join()
1125             self._thread = None
1126
1127     def _run_experiment_suite(self):
1128         for i in xrange[0, self.repetitions]:
1129             self._current = i
1130             self._run_one_experiment()
1131
1132     def _run_one_experiment(self):
1133         access_config = proxy.AccessConfiguration()
1134         for attr in self._access_config.attributes:
1135             access_config.set_attribute_value(attr.name, attr.value)
1136         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1137         root_dir = "%s_%d" % (
1138                 access_config.get_attribute_value(DC.ROOT_DIRECTORY), 
1139                 self._current)
1140         access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1141         controller = proxy.create_experiment_controller(self._experiment_xml,
1142                 access_config)
1143         self._experiments[self._current] = controller
1144         controller.start()
1145         started_at = time.time()
1146         # wait until all specified guids have finished execution
1147         if self._wait_guids:
1148             while all(itertools.imap(controller.is_finished, self._wait_guids):
1149                 time.sleep(0.5)
1150         # wait until the minimum experiment duration time has elapsed 
1151         if self._duration:
1152             while (time.time() - started_at) < self._duration:
1153                 time.sleep(0.5)
1154         controller.stop()
1155         #download results!!
1156         controller.shutdown()
1157     """
1158
1159
1160
1161
1162
1163
1164