Added routes to OMF nodes
[nepi.git] / src / nepi / core / execute.py
1 # -*- coding: utf-8 -*-
2
3 from nepi.core.attributes import Attribute, AttributesMap
4 from nepi.util import validation
5 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
6 from nepi.util.parser._xml import XmlExperimentParser
7 import sys
8 import re
9 import threading
10 import ConfigParser
11 import os
12 import collections
13 import functools
14 import time
15 import logging
16 logging.basicConfig()
17
18 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
19 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
20 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
21
22 def _undefer(deferred):
23     if hasattr(deferred, '_get'):
24         return deferred._get()
25     else:
26         return deferred
27
28
29 class TestbedController(object):
30     def __init__(self, testbed_id, testbed_version):
31         self._testbed_id = testbed_id
32         self._testbed_version = testbed_version
33
34     @property
35     def testbed_id(self):
36         return self._testbed_id
37
38     @property
39     def testbed_version(self):
40         return self._testbed_version
41
42     @property
43     def guids(self):
44         raise NotImplementedError
45
46     def defer_configure(self, name, value):
47         """Instructs setting a configuartion attribute for the testbed instance"""
48         raise NotImplementedError
49
50     def defer_create(self, guid, factory_id):
51         """Instructs creation of element """
52         raise NotImplementedError
53
54     def defer_create_set(self, guid, name, value):
55         """Instructs setting an initial attribute on an element"""
56         raise NotImplementedError
57
58     def defer_factory_set(self, guid, name, value):
59         """Instructs setting an attribute on a factory"""
60         raise NotImplementedError
61
62     def defer_connect(self, guid1, connector_type_name1, guid2, 
63             connector_type_name2): 
64         """Instructs creation of a connection between the given connectors"""
65         raise NotImplementedError
66
67     def defer_cross_connect(self, 
68             guid, connector_type_name,
69             cross_guid, cross_testbed_guid,
70             cross_testbed_id, cross_factory_id,
71             cross_connector_type_name):
72         """
73         Instructs creation of a connection between the given connectors 
74         of different testbed instances
75         """
76         raise NotImplementedError
77
78     def defer_add_trace(self, guid, trace_id):
79         """Instructs the addition of a trace"""
80         raise NotImplementedError
81
82     def defer_add_address(self, guid, address, netprefix, broadcast): 
83         """Instructs the addition of an address"""
84         raise NotImplementedError
85
86     def defer_add_route(self, guid, destination, netprefix, nexthop, 
87             metric = 0, device = None):
88         """Instructs the addition of a route"""
89         raise NotImplementedError
90
91     def do_setup(self):
92         """After do_setup the testbed initial configuration is done"""
93         raise NotImplementedError
94
95     def do_create(self):
96         """
97         After do_create all instructed elements are created and 
98         attributes setted
99         """
100         raise NotImplementedError
101
102     def do_connect_init(self):
103         """
104         After do_connect_init all internal connections between testbed elements
105         are initiated
106         """
107         raise NotImplementedError
108
109     def do_connect_compl(self):
110         """
111         After do_connect all internal connections between testbed elements
112         are completed
113         """
114         raise NotImplementedError
115
116     def do_preconfigure(self):
117         """
118         Done just before resolving netrefs, after connection, before cross connections,
119         useful for early stages of configuration, for setting up stuff that might be
120         required for netref resolution.
121         """
122         raise NotImplementedError
123
124     def do_configure(self):
125         """After do_configure elements are configured"""
126         raise NotImplementedError
127
128     def do_prestart(self):
129         """Before do_start elements are prestart-configured"""
130         raise NotImplementedError
131
132     def do_cross_connect_init(self, cross_data):
133         """
134         After do_cross_connect_init initiation of all external connections 
135         between different testbed elements is performed
136         """
137         raise NotImplementedError
138
139     def do_cross_connect_compl(self, cross_data):
140         """
141         After do_cross_connect_compl completion of all external connections 
142         between different testbed elements is performed
143         """
144         raise NotImplementedError
145
146     def start(self):
147         raise NotImplementedError
148
149     def stop(self):
150         raise NotImplementedError
151
152     def recover(self):
153         """
154         On testbed recovery (if recovery is a supported policy), the controller
155         instance will be re-created and the following sequence invoked:
156         
157             do_setup
158             defer_X - programming the testbed with persisted execution values
159                 (not design values). Execution values (ExecImmutable attributes)
160                 should be enough to recreate the testbed's state.
161             *recover*
162             <cross-connection methods>
163             
164         Start will not be called, and after cross connection invocations,
165         the testbed is supposed to be fully functional again.
166         """
167         raise NotImplementedError
168
169     def set(self, guid, name, value, time = TIME_NOW):
170         raise NotImplementedError
171
172     def get(self, guid, name, time = TIME_NOW):
173         raise NotImplementedError
174     
175     def get_route(self, guid, index, attribute):
176         """
177         Params:
178             
179             guid: guid of box to query
180             index: number of routing entry to fetch
181             attribute: one of Destination, NextHop, NetPrefix
182         """
183         raise NotImplementedError
184
185     def get_address(self, guid, index, attribute='Address'):
186         """
187         Params:
188             
189             guid: guid of box to query
190             index: number of inteface to select
191             attribute: one of Address, NetPrefix, Broadcast
192         """
193         raise NotImplementedError
194
195     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
196         raise NotImplementedError
197
198     def get_factory_id(self, guid):
199         raise NotImplementedError
200
201     def action(self, time, guid, action):
202         raise NotImplementedError
203
204     def status(self, guid):
205         raise NotImplementedError
206     
207     def testbed_status(self):
208         raise NotImplementedError
209
210     def trace(self, guid, trace_id, attribute='value'):
211         raise NotImplementedError
212
213     def traces_info(self):
214         """ dictionary of dictionaries:
215             traces_info = dict({
216                 guid = dict({
217                     trace_id = dict({
218                             host = host,
219                             filepath = filepath,
220                             filesize = size in bytes,
221                         })
222                 })
223             })"""
224         raise NotImplementedError
225
226     def shutdown(self):
227         raise NotImplementedError
228
229 class ExperimentController(object):
230     def __init__(self, experiment_xml, root_dir):
231         self._experiment_design_xml = experiment_xml
232         self._experiment_execute_xml = None
233         self._testbeds = dict()
234         self._deployment_config = dict()
235         self._netrefs = collections.defaultdict(set)
236         self._testbed_netrefs = collections.defaultdict(set)
237         self._cross_data = dict()
238         self._root_dir = root_dir
239         self._netreffed_testbeds = set()
240         self._guids_in_testbed_cache = dict()
241         self._failed_testbeds = set()
242         self._started_time = None
243         self._stopped_time = None
244         self._testbed_order = []
245       
246         self._logger = logging.getLogger('nepi.core.execute')
247         level = logging.ERROR
248         if os.environ.get("NEPI_CONTROLLER_LOGLEVEL", 
249                 DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
250             level = logging.DEBUG
251         self._logger.setLevel(level)
252  
253         if experiment_xml is None and root_dir is not None:
254             # Recover
255             self.load_experiment_xml()
256             self.load_execute_xml()
257         else:
258             self.persist_experiment_xml()
259
260     @property
261     def experiment_design_xml(self):
262         return self._experiment_design_xml
263
264     @property
265     def experiment_execute_xml(self):
266         return self._experiment_execute_xml
267
268     @property
269     def started_time(self):
270         return self._started_time
271
272     @property
273     def stopped_time(self):
274         return self._stopped_time
275
276     @property
277     def guids(self):
278         guids = list()
279         for testbed_guid in self._testbeds.keys():
280             _guids = self._guids_in_testbed(testbed_guid)
281             if _guids:
282                 guids.extend(_guids)
283         return guids
284
285     def persist_experiment_xml(self):
286         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
287         f = open(xml_path, "w")
288         f.write(self._experiment_design_xml)
289         f.close()
290
291     def persist_execute_xml(self):
292         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
293         f = open(xml_path, "w")
294         f.write(self._experiment_execute_xml)
295         f.close()
296
297     def load_experiment_xml(self):
298         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
299         f = open(xml_path, "r")
300         self._experiment_design_xml = f.read()
301         f.close()
302
303     def load_execute_xml(self):
304         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
305         f = open(xml_path, "r")
306         self._experiment_execute_xml = f.read()
307         f.close()
308
309     def trace(self, guid, trace_id, attribute='value'):
310         testbed = self._testbed_for_guid(guid)
311         if testbed != None:
312             return testbed.trace(guid, trace_id, attribute)
313         raise RuntimeError("No element exists with guid %d" % guid)    
314
315     def traces_info(self):
316         traces_info = dict()
317         for guid, testbed in self._testbeds.iteritems():
318             tinfo = testbed.traces_info()
319             if tinfo:
320                 traces_info[guid] = testbed.traces_info()
321         return traces_info
322
323     @staticmethod
324     def _parallel(callables):
325         excs = []
326         def wrap(callable):
327             def wrapped(*p, **kw):
328                 try:
329                     callable(*p, **kw)
330                 except:
331                     logging.exception("Exception occurred in asynchronous thread:")
332                     excs.append(sys.exc_info())
333             try:
334                 wrapped = functools.wraps(callable)(wrapped)
335             except:
336                 # functools.partial not wrappable
337                 pass
338             return wrapped
339         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
340         for thread in threads:
341             thread.start()
342         for thread in threads:
343             thread.join()
344         for exc in excs:
345             eTyp, eVal, eLoc = exc
346             raise eTyp, eVal, eLoc
347
348     def start(self):
349         self._started_time = time.time() 
350         self._start()
351
352     def _start(self, recover = False):
353         parser = XmlExperimentParser()
354         
355         if recover:
356             xml = self._experiment_execute_xml
357         else:
358             xml = self._experiment_design_xml
359         data = parser.from_xml_to_data(xml)
360
361         # instantiate testbed controllers
362         to_recover, to_restart = self._init_testbed_controllers(data, recover)
363         all_restart = set(to_restart)
364         
365         if not recover:
366             # persist testbed connection data, for potential recovery
367             self._persist_testbed_proxies()
368         else:
369             # recover recoverable controllers
370             for guid in to_recover:
371                 try:
372                     self._testbeds[guid].do_setup()
373                     self._testbeds[guid].recover()
374                 except:
375                     self._logger.exception("During recovery of testbed %s", guid)
376                     
377                     # Mark failed
378                     self._failed_testbeds.add(guid)
379     
380         def steps_to_configure(self, allowed_guids):
381             # perform setup in parallel for all test beds,
382             # wait for all threads to finish
383
384             self._logger.debug("ExperimentController: Starting parallel do_setup")
385             self._parallel([testbed.do_setup 
386                             for guid,testbed in self._testbeds.iteritems()
387                             if guid in allowed_guids])
388        
389             # perform create-connect in parallel, wait
390             # (internal connections only)
391             self._logger.debug("ExperimentController: Starting parallel do_create")
392             self._parallel([testbed.do_create
393                             for guid,testbed in self._testbeds.iteritems()
394                             if guid in allowed_guids])
395
396             self._logger.debug("ExperimentController: Starting parallel do_connect_init")
397             self._parallel([testbed.do_connect_init
398                             for guid,testbed in self._testbeds.iteritems()
399                             if guid in allowed_guids])
400
401             self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
402             self._parallel([testbed.do_connect_compl
403                             for guid,testbed in self._testbeds.iteritems()
404                             if guid in allowed_guids])
405
406             self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
407             self._parallel([testbed.do_preconfigure
408                             for guid,testbed in self._testbeds.iteritems()
409                             if guid in allowed_guids])
410             self._clear_caches()
411             
412             # Store testbed order
413             self._testbed_order.append(allowed_guids)
414
415         steps_to_configure(self, to_restart)
416
417         if self._netreffed_testbeds:
418             self._logger.debug("ExperimentController: Resolving netreffed testbeds")
419             # initally resolve netrefs
420             self.do_netrefs(data, fail_if_undefined=False)
421             
422             # rinse and repeat, for netreffed testbeds
423             netreffed_testbeds = set(self._netreffed_testbeds)
424
425             to_recover, to_restart = self._init_testbed_controllers(data, recover)
426             all_restart.update(to_restart)
427             
428             if not recover:
429                 # persist testbed connection data, for potential recovery
430                 self._persist_testbed_proxies()
431             else:
432                 # recover recoverable controllers
433                 for guid in to_recover:
434                     try:
435                         self._testbeds[guid].do_setup()
436                         self._testbeds[guid].recover()
437                     except:
438                         self._logger.exception("During recovery of testbed %s", guid)
439
440                         # Mark failed
441                         self._failed_testbeds.add(guid)
442
443             # configure dependant testbeds
444             steps_to_configure(self, to_restart)
445         
446         all_restart = [ self._testbeds[guid] for guid in all_restart ]
447             
448         # final netref step, fail if anything's left unresolved
449         self._logger.debug("ExperimentController: Resolving do_netrefs")
450         self.do_netrefs(data, fail_if_undefined=False)
451        
452         # Only now, that netref dependencies have been solve, it is safe to
453         # program cross_connections
454         self._logger.debug("ExperimentController: Programming testbed cross-connections")
455         self._program_testbed_cross_connections(data)
456  
457         # perform do_configure in parallel for al testbeds
458         # (it's internal configuration for each)
459         self._logger.debug("ExperimentController: Starting parallel do_configure")
460         self._parallel([testbed.do_configure
461                         for testbed in all_restart])
462
463         self._clear_caches()
464
465         #print >>sys.stderr, "DO IT"
466         #import time
467         #time.sleep(60)
468         
469         # cross-connect (cannot be done in parallel)
470         self._logger.debug("ExperimentController: Starting cross-connect")
471         for guid, testbed in self._testbeds.iteritems():
472             cross_data = self._get_cross_data(guid)
473             testbed.do_cross_connect_init(cross_data)
474         for guid, testbed in self._testbeds.iteritems():
475             cross_data = self._get_cross_data(guid)
476             testbed.do_cross_connect_compl(cross_data)
477        
478         self._clear_caches()
479
480         # Last chance to configure (parallel on all testbeds)
481         self._logger.debug("ExperimentController: Starting parallel do_prestart")
482         self._parallel([testbed.do_prestart
483                         for testbed in all_restart])
484
485         # final netref step, fail if anything's left unresolved
486         self.do_netrefs(data, fail_if_undefined=True)
487  
488         self._clear_caches()
489         
490         if not recover:
491             # update execution xml with execution-specific values
492             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
493             self._update_execute_xml()
494             self.persist_execute_xml()
495
496         # start experiment (parallel start on all testbeds)
497         self._logger.debug("ExperimentController: Starting parallel do_start")
498         self._parallel([testbed.start
499                         for testbed in all_restart])
500
501         self._clear_caches()
502
503     def _clear_caches(self):
504         # Cleaning cache for safety.
505         self._guids_in_testbed_cache = dict()
506
507     def _persist_testbed_proxies(self):
508         TRANSIENT = (DC.RECOVER,)
509         
510         # persist access configuration for all testbeds, so that
511         # recovery mode can reconnect to them if it becomes necessary
512         conf = ConfigParser.RawConfigParser()
513         for testbed_guid, testbed_config in self._deployment_config.iteritems():
514             testbed_guid = str(testbed_guid)
515             conf.add_section(testbed_guid)
516             for attr in testbed_config.get_attribute_list():
517                 if attr not in TRANSIENT:
518                     value = testbed_config.get_attribute_value(attr)
519                     if value is not None:
520                         conf.set(testbed_guid, attr, value)
521         
522         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
523         conf.write(f)
524         f.close()
525     
526     def _load_testbed_proxies(self):
527         TYPEMAP = {
528             Attribute.STRING : 'get',
529             Attribute.BOOL : 'getboolean',
530             Attribute.ENUM : 'get',
531             Attribute.DOUBLE : 'getfloat',
532             Attribute.INTEGER : 'getint',
533         }
534         
535         TRANSIENT = (DC.RECOVER,)
536         
537         # deferred import because proxy needs
538         # our class definitions to define proxies
539         import nepi.util.proxy as proxy
540         
541         conf = ConfigParser.RawConfigParser()
542         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
543         for testbed_guid in conf.sections():
544             testbed_config = proxy.AccessConfiguration()
545             testbed_guid = str(testbed_guid)
546             for attr in testbed_config.get_attribute_list():
547                 if attr not in TRANSIENT:
548                     getter = getattr(conf, TYPEMAP.get(
549                         testbed_config.get_attribute_type(attr),
550                         'get') )
551                     try:
552                         value = getter(testbed_guid, attr)
553                         testbed_config.set_attribute_value(attr, value)
554                     except ConfigParser.NoOptionError:
555                         # Leave default
556                         pass
557     
558     def _unpersist_testbed_proxies(self):
559         try:
560             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
561         except:
562             # Just print exceptions, this is just cleanup
563             self._logger.exception("Loading testbed configuration")
564
565     def _update_execute_xml(self):
566         # For all testbeds,
567         #   For all elements in testbed,
568         #       - gather immutable execute-readable attribuets lists
569         #         asynchronously
570         # Generate new design description from design xml
571         # (Wait for attributes lists - implicit syncpoint)
572         # For all testbeds,
573         #   For all elements in testbed,
574         #       - gather all immutable execute-readable attribute
575         #         values, asynchronously
576         # (Wait for attribute values - implicit syncpoint)
577         # For all testbeds,
578         #   For all elements in testbed,
579         #       - inject non-None values into new design
580         # Generate execute xml from new design
581
582         attribute_lists = dict(
583             (testbed_guid, collections.defaultdict(dict))
584             for testbed_guid in self._testbeds
585         )
586         
587         for testbed_guid, testbed in self._testbeds.iteritems():
588             guids = self._guids_in_testbed(testbed_guid)
589             for guid in guids:
590                 attribute_lists[testbed_guid][guid] = \
591                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
592         
593         parser = XmlExperimentParser()
594         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
595
596         attribute_values = dict(
597             (testbed_guid, collections.defaultdict(dict))
598             for testbed_guid in self._testbeds
599         )
600         
601         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
602             testbed = self._testbeds[testbed_guid]
603             for guid, attribute_list in testbed_attribute_lists.iteritems():
604                 attribute_list = _undefer(attribute_list)
605                 attribute_values[testbed_guid][guid] = dict(
606                     (attribute, testbed.get_deferred(guid, attribute))
607                     for attribute in attribute_list
608                 )
609         
610         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
611             for guid, attribute_values in testbed_attribute_values.iteritems():
612                 for attribute, value in attribute_values.iteritems():
613                     value = _undefer(value)
614                     if value is not None:
615                         execute_data.add_attribute_data(guid, attribute, value)
616         
617         self._experiment_execute_xml = parser.to_xml(data=execute_data)
618
619     def stop(self):
620        for testbed in self._testbeds.values():
621            testbed.stop()
622        self._unpersist_testbed_proxies()
623        self._stopped_time = time.time() 
624    
625     def recover(self):
626         # reload perviously persisted testbed access configurations
627         self._failed_testbeds.clear()
628         self._load_testbed_proxies()
629
630         # re-program testbeds that need recovery
631         self._start(recover = True)
632
633     def is_finished(self, guid):
634         testbed = self._testbed_for_guid(guid)
635         if testbed != None:
636             return testbed.status(guid) == AS.STATUS_FINISHED
637         raise RuntimeError("No element exists with guid %d" % guid)    
638     
639     def _testbed_recovery_policy(self, guid, data = None):
640         if data is None:
641             parser = XmlExperimentParser()
642             data = parser.from_xml_to_data(self._experiment_design_xml)
643         
644         return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
645
646     def status(self, guid):
647         if guid in self._testbeds:
648             # guid is a testbed
649             # report testbed status
650             if guid in self._failed_testbeds:
651                 return TS.STATUS_FAILED
652             else:
653                 try:
654                     return self._testbeds[guid].status()
655                 except:
656                     return TS.STATUS_UNRESPONSIVE
657         else:
658             # guid is an element
659             testbed = self._testbed_for_guid(guid)
660             if testbed is not None:
661                 return testbed.status(guid)
662             else:
663                 return AS.STATUS_UNDETERMINED
664
665     def set(self, guid, name, value, time = TIME_NOW):
666         testbed = self._testbed_for_guid(guid)
667         if testbed != None:
668             testbed.set(guid, name, value, time)
669         else:
670             raise RuntimeError("No element exists with guid %d" % guid)    
671
672     def get(self, guid, name, time = TIME_NOW):
673         testbed = self._testbed_for_guid(guid)
674         if testbed != None:
675             return testbed.get(guid, name, time)
676         raise RuntimeError("No element exists with guid %d" % guid)    
677
678     def get_deferred(self, guid, name, time = TIME_NOW):
679         testbed = self._testbed_for_guid(guid)
680         if testbed != None:
681             return testbed.get_deferred(guid, name, time)
682         raise RuntimeError("No element exists with guid %d" % guid)    
683
684     def get_factory_id(self, guid):
685         testbed = self._testbed_for_guid(guid)
686         if testbed != None:
687             return testbed.get_factory_id(guid)
688         raise RuntimeError("No element exists with guid %d" % guid)    
689
690     def get_testbed_id(self, guid):
691         testbed = self._testbed_for_guid(guid)
692         if testbed != None:
693             return testbed.testbed_id
694         raise RuntimeError("No element exists with guid %d" % guid)    
695
696     def get_testbed_version(self, guid):
697         testbed = self._testbed_for_guid(guid)
698         if testbed != None:
699             return testbed.testbed_version
700         raise RuntimeError("No element exists with guid %d" % guid)    
701
702     def shutdown(self):
703         exceptions = list()
704         ordered_testbeds = set()
705
706         def shutdown_testbed(guid):
707             try:
708                 testbed = self._testbeds[guid]
709                 ordered_testbeds.add(guid)
710                 testbed.shutdown()
711             except:
712                 exceptions.append(sys.exc_info())
713                 
714         self._logger.debug("ExperimentController: Starting parallel shutdown")
715         
716         for testbed_guids in reversed(self._testbed_order):
717             testbed_guids = set(testbed_guids) - ordered_testbeds
718             self._logger.debug("ExperimentController: Shutting down %r", testbed_guids)
719             self._parallel([functools.partial(shutdown_testbed, guid)
720                             for guid in testbed_guids])
721         remaining_guids = set(self._testbeds) - ordered_testbeds
722         if remaining_guids:
723             self._logger.debug("ExperimentController: Shutted down %r", ordered_testbeds)
724             self._logger.debug("ExperimentController: Shutting down %r", remaining_guids)
725             self._parallel([functools.partial(shutdown_testbed, guid)
726                             for guid in remaining_guids])
727             
728         for exc_info in exceptions:
729             raise exc_info[0], exc_info[1], exc_info[2]
730
731     def _testbed_for_guid(self, guid):
732         for testbed_guid in self._testbeds.keys():
733             if guid in self._guids_in_testbed(testbed_guid):
734                 if testbed_guid in self._failed_testbeds:
735                     return None
736                 return self._testbeds[testbed_guid]
737         return None
738
739     def _guids_in_testbed(self, testbed_guid):
740         if testbed_guid not in self._testbeds:
741             return set()
742         if testbed_guid not in self._guids_in_testbed_cache:
743             self._guids_in_testbed_cache[testbed_guid] = \
744                 set(self._testbeds[testbed_guid].guids)
745         return self._guids_in_testbed_cache[testbed_guid]
746
747     @staticmethod
748     def _netref_component_split(component):
749         match = COMPONENT_PATTERN.match(component)
750         if match:
751             return match.group("kind"), match.group("index")
752         else:
753             return component, None
754
755     _NETREF_COMPONENT_GETTERS = {
756         'addr':
757             lambda testbed, guid, index, name: 
758                 testbed.get_address(guid, int(index), name),
759         'route' :
760             lambda testbed, guid, index, name: 
761                 testbed.get_route(guid, int(index), name),
762         'trace' :
763             lambda testbed, guid, index, name: 
764                 testbed.trace(guid, index, attribute = name),
765         '' : 
766             lambda testbed, guid, index, name: 
767                 testbed.get(guid, name),
768     }
769     
770     def resolve_netref_value(self, value, failval = None):
771         rv = failval
772         while True:
773             for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
774                 label = match.group("label")
775                 if label.startswith('GUID-'):
776                     ref_guid = int(label[5:])
777                     if ref_guid:
778                         expr = match.group("expr")
779                         component = (match.group("component") or "")[1:] # skip the dot
780                         attribute = match.group("attribute")
781                         
782                         # split compound components into component kind and index
783                         # eg: 'addr[0]' -> ('addr', '0')
784                         component, component_index = self._netref_component_split(component)
785
786                         # find object and resolve expression
787                         for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
788                             if component not in self._NETREF_COMPONENT_GETTERS:
789                                 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
790                             elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
791                                 pass
792                             else:
793                                 ref_value = self._NETREF_COMPONENT_GETTERS[component](
794                                     ref_testbed, ref_guid, component_index, attribute)
795                                 if ref_value:
796                                     value = rv = value.replace(match.group(), ref_value)
797                                     break
798                         else:
799                             # unresolvable netref
800                             return failval
801                         break
802             else:
803                 break
804         return rv
805     
806     def do_netrefs(self, data, fail_if_undefined = False):
807         # element netrefs
808         for (testbed_guid, guid), attrs in self._netrefs.items():
809             testbed = self._testbeds.get(testbed_guid)
810             if testbed is not None:
811                 for name in set(attrs):
812                     value = testbed.get(guid, name)
813                     if isinstance(value, basestring):
814                         ref_value = self.resolve_netref_value(value)
815                         if ref_value is not None:
816                             testbed.set(guid, name, ref_value)
817                             attrs.remove(name)
818                         elif fail_if_undefined:
819                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
820                 if not attrs:
821                     del self._netrefs[(testbed_guid, guid)]
822         
823         # testbed netrefs
824         for testbed_guid, attrs in self._testbed_netrefs.items():
825             tb_data = dict(data.get_attribute_data(testbed_guid))
826             if data:
827                 for name in set(attrs):
828                     value = tb_data.get(name)
829                     if isinstance(value, basestring):
830                         ref_value = self.resolve_netref_value(value)
831                         if ref_value is not None:
832                             data.set_attribute_data(testbed_guid, name, ref_value)
833                             attrs.remove(name)
834                         elif fail_if_undefined:
835                             raise ValueError, "Unresolvable netref in: %r" % (value,)
836                 if not attrs:
837                     del self._testbed_netrefs[testbed_guid]
838         
839
840     def _init_testbed_controllers(self, data, recover = False):
841         blacklist_testbeds = set(self._testbeds)
842         element_guids = list()
843         label_guids = dict()
844         data_guids = data.guids
845         to_recover = set()
846         to_restart = set()
847
848         # gather label associations
849         for guid in data_guids:
850             if not data.is_testbed_data(guid):
851                 (testbed_guid, factory_id) = data.get_box_data(guid)
852                 label = data.get_attribute_data(guid, "label")
853                 if label is not None:
854                     if label in label_guids:
855                         raise RuntimeError, "Label %r is not unique" % (label,)
856                     label_guids[label] = guid
857
858         # create testbed controllers
859         for guid in data_guids:
860             if data.is_testbed_data(guid):
861                 if guid not in self._testbeds:
862                     try:
863                         self._create_testbed_controller(
864                             guid, data, element_guids, recover)
865                         if recover:
866                             # Already programmed
867                             blacklist_testbeds.add(guid)
868                         else:
869                             to_restart.add(guid)
870                     except:
871                         if recover:
872                             policy = self._testbed_recovery_policy(guid, data=data)
873                             if policy == DC.POLICY_RECOVER:
874                                 self._create_testbed_controller(
875                                     guid, data, element_guids, False)
876                                 to_recover.add(guid)
877                             elif policy == DC.POLICY_RESTART:
878                                 self._create_testbed_controller(
879                                     guid, data, element_guids, False)
880                                 to_restart.add(guid)
881                             else:
882                                 # Mark failed
883                                 self._failed_testbeds.add(guid)
884                         else:
885                             raise
886         
887         # queue programmable elements
888         #  - that have not been programmed already (blacklist_testbeds)
889         #  - including recovered or restarted testbeds
890         #  - but those that have no unresolved netrefs
891         for guid in data_guids:
892             if not data.is_testbed_data(guid):
893                 (testbed_guid, factory_id) = data.get_box_data(guid)
894                 if testbed_guid not in blacklist_testbeds:
895                     element_guids.append(guid)
896
897         # replace references to elements labels for its guid
898         self._resolve_labels(data, data_guids, label_guids)
899     
900         # program testbed controllers
901         if element_guids:
902             self._program_testbed_controllers(element_guids, data)
903         
904         return to_recover, to_restart
905
906     def _resolve_labels(self, data, data_guids, label_guids):
907         netrefs = self._netrefs
908         testbed_netrefs = self._testbed_netrefs
909         for guid in data_guids:
910             for name, value in data.get_attribute_data(guid):
911                 if isinstance(value, basestring):
912                     while True:
913                         for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
914                             label = match.group("label")
915                             if not label.startswith('GUID-'):
916                                 ref_guid = label_guids.get(label)
917                                 if ref_guid is not None:
918                                     value = value.replace(
919                                         match.group(),
920                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
921                                             guid = 'GUID-%d' % (ref_guid,),
922                                             expr = match.group("expr"),
923                                             label = label)
924                                     )
925                                     data.set_attribute_data(guid, name, value)
926                                     
927                                     # memorize which guid-attribute pairs require
928                                     # postprocessing, to avoid excessive controller-testbed
929                                     # communication at configuration time
930                                     # (which could require high-latency network I/O)
931                                     if not data.is_testbed_data(guid):
932                                         (testbed_guid, factory_id) = data.get_box_data(guid)
933                                         netrefs[(testbed_guid, guid)].add(name)
934                                     else:
935                                         testbed_netrefs[guid].add(name)
936                                     
937                                     break
938                         else:
939                             break
940
941     def _create_testbed_controller(self, guid, data, element_guids, recover):
942         (testbed_id, testbed_version) = data.get_testbed_data(guid)
943         deployment_config = self._deployment_config.get(guid)
944         
945         # deferred import because proxy needs
946         # our class definitions to define proxies
947         import nepi.util.proxy as proxy
948         
949         if deployment_config is None:
950             # need to create one
951             deployment_config = proxy.AccessConfiguration()
952             
953             for (name, value) in data.get_attribute_data(guid):
954                 if value is not None and deployment_config.has_attribute(name):
955                     # if any deployment config attribute has a netref, we can't
956                     # create this controller yet
957                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
958                         # remember to re-issue this one
959                         self._netreffed_testbeds.add(guid)
960                         return
961                     
962                     # copy deployment config attribute
963                     deployment_config.set_attribute_value(name, value)
964             
965             # commit config
966             self._deployment_config[guid] = deployment_config
967         
968         if deployment_config is not None:
969             # force recovery mode 
970             deployment_config.set_attribute_value("recover",recover)
971         
972         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
973                 deployment_config)
974         for (name, value) in data.get_attribute_data(guid):
975             testbed.defer_configure(name, value)
976         self._testbeds[guid] = testbed
977         if guid in self._netreffed_testbeds:
978             self._netreffed_testbeds.remove(guid)
979
980     def _program_testbed_controllers(self, element_guids, data):
981         def resolve_create_netref(data, guid, name, value): 
982             # Try to resolve create-time netrefs, if possible
983             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
984                 try:
985                     nuvalue = self.resolve_netref_value(value)
986                 except:
987                     # Any trouble means we're not in shape to resolve the netref yet
988                     nuvalue = None
989                 if nuvalue is not None:
990                     # Only if we succeed we remove the netref deferral entry
991                     value = nuvalue
992                     data.set_attribute_data(guid, name, value)
993                     if (testbed_guid, guid) in self._netrefs:
994                         self._netrefs[(testbed_guid, guid)].discard(name)
995             return value
996
997         for guid in element_guids:
998             (testbed_guid, factory_id) = data.get_box_data(guid)
999             testbed = self._testbeds.get(testbed_guid)
1000             if testbed is not None:
1001                 # create
1002                 testbed.defer_create(guid, factory_id)
1003                 # set attributes
1004                 for (name, value) in data.get_attribute_data(guid):
1005                     value = resolve_create_netref(data, guid, name, value)
1006                     testbed.defer_create_set(guid, name, value)
1007
1008         for guid in element_guids:
1009             (testbed_guid, factory_id) = data.get_box_data(guid)
1010             testbed = self._testbeds.get(testbed_guid)
1011             if testbed is not None:
1012                 # traces
1013                 for trace_id in data.get_trace_data(guid):
1014                     testbed.defer_add_trace(guid, trace_id)
1015                 # addresses
1016                 for (address, netprefix, broadcast) in data.get_address_data(guid):
1017                     if address != None:
1018                         testbed.defer_add_address(guid, address, netprefix, 
1019                                 broadcast)
1020                 # routes
1021                 for (destination, netprefix, nexthop, metric, device) in \
1022                         data.get_route_data(guid):
1023                     testbed.defer_add_route(guid, destination, netprefix, nexthop, 
1024                             metric, device)
1025                 # store connections data
1026                 for (connector_type_name, other_guid, other_connector_type_name) \
1027                         in data.get_connection_data(guid):
1028                     (other_testbed_guid, other_factory_id) = data.get_box_data(
1029                             other_guid)
1030                     if testbed_guid == other_testbed_guid:
1031                         # each testbed should take care of enforcing internal
1032                         # connection simmetry, so each connection is only
1033                         # added in one direction
1034                         testbed.defer_connect(guid, connector_type_name, 
1035                                 other_guid, other_connector_type_name)
1036
1037     def _program_testbed_cross_connections(self, data):
1038         data_guids = data.guids
1039         for guid in data_guids: 
1040             if not data.is_testbed_data(guid):
1041                 (testbed_guid, factory_id) = data.get_box_data(guid)
1042                 testbed = self._testbeds.get(testbed_guid)
1043                 if testbed is not None:
1044                     for (connector_type_name, cross_guid, cross_connector_type_name) \
1045                             in data.get_connection_data(guid):
1046                         (testbed_guid, factory_id) = data.get_box_data(guid)
1047                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
1048                                 cross_guid)
1049                         if testbed_guid != cross_testbed_guid:
1050                             cross_testbed = self._testbeds[cross_testbed_guid]
1051                             cross_testbed_id = cross_testbed.testbed_id
1052                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
1053                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
1054                                     cross_connector_type_name)
1055                             # save cross data for later
1056                             self._logger.debug("ExperimentController: adding cross_connection data tbd=%d:guid=%d - tbd=%d:guid=%d" % \
1057                                     (testbed_guid, guid, cross_testbed_guid, cross_guid))
1058                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1059                                     cross_guid)
1060
1061     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1062         if testbed_guid not in self._cross_data:
1063             self._cross_data[testbed_guid] = dict()
1064         if cross_testbed_guid not in self._cross_data[testbed_guid]:
1065             self._cross_data[testbed_guid][cross_testbed_guid] = set()
1066         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1067
1068     def _get_cross_data(self, testbed_guid):
1069         cross_data = dict()
1070         if not testbed_guid in self._cross_data:
1071             return cross_data
1072
1073         # fetch attribute lists in one batch
1074         attribute_lists = dict()
1075         for cross_testbed_guid, guid_list in \
1076                 self._cross_data[testbed_guid].iteritems():
1077             cross_testbed = self._testbeds[cross_testbed_guid]
1078             for cross_guid in guid_list:
1079                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1080                     cross_testbed.get_attribute_list_deferred(cross_guid)
1081
1082         # fetch attribute values in another batch
1083         for cross_testbed_guid, guid_list in \
1084                 self._cross_data[testbed_guid].iteritems():
1085             cross_data[cross_testbed_guid] = dict()
1086             cross_testbed = self._testbeds[cross_testbed_guid]
1087             for cross_guid in guid_list:
1088                 elem_cross_data = dict(
1089                     _guid = cross_guid,
1090                     _testbed_guid = cross_testbed_guid,
1091                     _testbed_id = cross_testbed.testbed_id,
1092                     _testbed_version = cross_testbed.testbed_version)
1093                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1094                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1095                 for attr_name in attribute_list:
1096                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1097                     elem_cross_data[attr_name] = attr_value
1098         
1099         # undefer all values - we'll have to serialize them probably later
1100         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1101             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1102                 for attr_name, attr_value in elem_cross_data.iteritems():
1103                     elem_cross_data[attr_name] = _undefer(attr_value)
1104         
1105         return cross_data
1106
1107 class ExperimentSuite(object):
1108     def __init__(self, experiment_xml, access_config, repetitions = None,
1109             duration = None, wait_guids = None):
1110         self._experiment_xml = experiment_xml
1111         self._access_config = access_config
1112         self._controllers = dict()
1113         self._access_configs = dict()
1114         self._repetitions = 1 if not repetitions else repetitions
1115         self._duration = duration
1116         self._wait_guids = wait_guids
1117         self._current = None
1118         self._status = TS.STATUS_ZERO
1119         self._thread = None
1120
1121     def current(self):
1122         return self._current
1123
1124     def status(self):
1125         return self._status
1126
1127     def is_finished(self):
1128         return self._status == TS.STATUS_STOPPED
1129
1130     def get_access_configurations(self):
1131         return self._access_configs.values()
1132
1133     def start(self):
1134         self._status  = TS.STATUS_STARTED
1135         self._thread = threading.Thread(target = self._run_experiment_suite)
1136         self._thread.start()
1137
1138     def shutdown(self):
1139         if self._thread:
1140             self._thread.join()
1141             self._thread = None
1142         for controller in self._controllers.values():
1143             controller.shutdown()
1144
1145     def get_current_access_config(self):
1146         return self._access_configs[self._current]
1147
1148     def _run_experiment_suite(self):
1149         for i in xrange(1, self._repetitions):
1150             self._current = i
1151             self._run_one_experiment()
1152         self._status = TS.STATUS_STOPPED
1153
1154     def _run_one_experiment(self):
1155         from nepi.util import proxy
1156         access_config = proxy.AccessConfiguration()
1157         for attr in self._access_config.attributes:
1158             if attr.value:
1159                 access_config.set_attribute_value(attr.name, attr.value)
1160         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1161         root_dir = "%s_%d" % (
1162                 access_config.get_attribute_value(DC.ROOT_DIRECTORY), 
1163                 self._current)
1164         access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1165         controller = proxy.create_experiment_controller(self._experiment_xml,
1166                 access_config)
1167         self._access_configs[self._current] = access_config
1168         self._controllers[self._current] = controller
1169         controller.start()
1170         started_at = time.time()
1171         # wait until all specified guids have finished execution
1172         if self._wait_guids:
1173             while all(itertools.imap(controller.is_finished, self._wait_guids)):
1174                 time.sleep(0.5)
1175         # wait until the minimum experiment duration time has elapsed 
1176         if self._duration:
1177             while (time.time() - started_at) < self._duration:
1178                 time.sleep(0.5)
1179         controller.stop()
1180