9f034cbbb01c12241e21c250b76a74e7413916e1
[nepi.git] / src / nepi / core / execute.py
1 # -*- coding: utf-8 -*-
2
3 from nepi.core.attributes import Attribute, AttributesMap
4 from nepi.util import validation
5 from nepi.util.constants import ApplicationStatus as AS, TestbedStatus as TS, TIME_NOW, DeploymentConfiguration as DC
6 from nepi.util.parser._xml import XmlExperimentParser
7 import sys
8 import re
9 import threading
10 import ConfigParser
11 import os
12 import collections
13 import functools
14 import time
15 import logging
16 logging.basicConfig()
17
18 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[-a-zA-Z0-9._]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
19 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
20 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
21
22 def _undefer(deferred):
23     if hasattr(deferred, '_get'):
24         return deferred._get()
25     else:
26         return deferred
27
28
29 class TestbedController(object):
30     def __init__(self, testbed_id, testbed_version):
31         self._testbed_id = testbed_id
32         self._testbed_version = testbed_version
33
34     @property
35     def testbed_id(self):
36         return self._testbed_id
37
38     @property
39     def testbed_version(self):
40         return self._testbed_version
41
42     @property
43     def guids(self):
44         raise NotImplementedError
45
46     def defer_configure(self, name, value):
47         """Instructs setting a configuartion attribute for the testbed instance"""
48         raise NotImplementedError
49
50     def defer_create(self, guid, factory_id):
51         """Instructs creation of element """
52         raise NotImplementedError
53
54     def defer_create_set(self, guid, name, value):
55         """Instructs setting an initial attribute on an element"""
56         raise NotImplementedError
57
58     def defer_factory_set(self, guid, name, value):
59         """Instructs setting an attribute on a factory"""
60         raise NotImplementedError
61
62     def defer_connect(self, guid1, connector_type_name1, guid2, 
63             connector_type_name2): 
64         """Instructs creation of a connection between the given connectors"""
65         raise NotImplementedError
66
67     def defer_cross_connect(self, 
68             guid, connector_type_name,
69             cross_guid, cross_testbed_guid,
70             cross_testbed_id, cross_factory_id,
71             cross_connector_type_name):
72         """
73         Instructs creation of a connection between the given connectors 
74         of different testbed instances
75         """
76         raise NotImplementedError
77
78     def defer_add_trace(self, guid, trace_id):
79         """Instructs the addition of a trace"""
80         raise NotImplementedError
81
82     def defer_add_address(self, guid, address, netprefix, broadcast): 
83         """Instructs the addition of an address"""
84         raise NotImplementedError
85
86     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
87         """Instructs the addition of a route"""
88         raise NotImplementedError
89
90     def do_setup(self):
91         """After do_setup the testbed initial configuration is done"""
92         raise NotImplementedError
93
94     def do_create(self):
95         """
96         After do_create all instructed elements are created and 
97         attributes setted
98         """
99         raise NotImplementedError
100
101     def do_connect_init(self):
102         """
103         After do_connect_init all internal connections between testbed elements
104         are initiated
105         """
106         raise NotImplementedError
107
108     def do_connect_compl(self):
109         """
110         After do_connect all internal connections between testbed elements
111         are completed
112         """
113         raise NotImplementedError
114
115     def do_preconfigure(self):
116         """
117         Done just before resolving netrefs, after connection, before cross connections,
118         useful for early stages of configuration, for setting up stuff that might be
119         required for netref resolution.
120         """
121         raise NotImplementedError
122
123     def do_configure(self):
124         """After do_configure elements are configured"""
125         raise NotImplementedError
126
127     def do_prestart(self):
128         """Before do_start elements are prestart-configured"""
129         raise NotImplementedError
130
131     def do_cross_connect_init(self, cross_data):
132         """
133         After do_cross_connect_init initiation of all external connections 
134         between different testbed elements is performed
135         """
136         raise NotImplementedError
137
138     def do_cross_connect_compl(self, cross_data):
139         """
140         After do_cross_connect_compl completion of all external connections 
141         between different testbed elements is performed
142         """
143         raise NotImplementedError
144
145     def start(self):
146         raise NotImplementedError
147
148     def stop(self):
149         raise NotImplementedError
150
151     def recover(self):
152         """
153         On testbed recovery (if recovery is a supported policy), the controller
154         instance will be re-created and the following sequence invoked:
155         
156             do_setup
157             defer_X - programming the testbed with persisted execution values
158                 (not design values). Execution values (ExecImmutable attributes)
159                 should be enough to recreate the testbed's state.
160             *recover*
161             <cross-connection methods>
162             
163         Start will not be called, and after cross connection invocations,
164         the testbed is supposed to be fully functional again.
165         """
166         raise NotImplementedError
167
168     def set(self, guid, name, value, time = TIME_NOW):
169         raise NotImplementedError
170
171     def get(self, guid, name, time = TIME_NOW):
172         raise NotImplementedError
173     
174     def get_route(self, guid, index, attribute):
175         """
176         Params:
177             
178             guid: guid of box to query
179             index: number of routing entry to fetch
180             attribute: one of Destination, NextHop, NetPrefix
181         """
182         raise NotImplementedError
183
184     def get_address(self, guid, index, attribute='Address'):
185         """
186         Params:
187             
188             guid: guid of box to query
189             index: number of inteface to select
190             attribute: one of Address, NetPrefix, Broadcast
191         """
192         raise NotImplementedError
193
194     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
195         raise NotImplementedError
196
197     def get_factory_id(self, guid):
198         raise NotImplementedError
199
200     def action(self, time, guid, action):
201         raise NotImplementedError
202
203     def status(self, guid):
204         raise NotImplementedError
205     
206     def testbed_status(self):
207         raise NotImplementedError
208
209     def trace(self, guid, trace_id, attribute='value'):
210         raise NotImplementedError
211
212     def traces_info(self):
213         """ dictionary of dictionaries:
214             traces_info = dict({
215                 guid = dict({
216                     trace_id = dict({
217                             host = host,
218                             filepath = filepath,
219                             filesize = size in bytes,
220                         })
221                 })
222             })"""
223         raise NotImplementedError
224
225     def shutdown(self):
226         raise NotImplementedError
227
228 class ExperimentController(object):
229     def __init__(self, experiment_xml, root_dir):
230         self._experiment_design_xml = experiment_xml
231         self._experiment_execute_xml = None
232         self._testbeds = dict()
233         self._deployment_config = dict()
234         self._netrefs = collections.defaultdict(set)
235         self._testbed_netrefs = collections.defaultdict(set)
236         self._cross_data = dict()
237         self._root_dir = root_dir
238         self._netreffed_testbeds = set()
239         self._guids_in_testbed_cache = dict()
240         self._failed_testbeds = set()
241         self._started_time = None
242         self._stopped_time = None
243         self._testbed_order = []
244       
245         self._logger = logging.getLogger('nepi.core.execute')
246         level = logging.ERROR
247         if os.environ.get("NEPI_CONTROLLER_LOGLEVEL", 
248                 DC.ERROR_LEVEL) == DC.DEBUG_LEVEL:
249             level = logging.DEBUG
250         self._logger.setLevel(level)
251  
252         if experiment_xml is None and root_dir is not None:
253             # Recover
254             self.load_experiment_xml()
255             self.load_execute_xml()
256         else:
257             self.persist_experiment_xml()
258
259     @property
260     def experiment_design_xml(self):
261         return self._experiment_design_xml
262
263     @property
264     def experiment_execute_xml(self):
265         return self._experiment_execute_xml
266
267     @property
268     def started_time(self):
269         return self._started_time
270
271     @property
272     def stopped_time(self):
273         return self._stopped_time
274
275     @property
276     def guids(self):
277         guids = list()
278         for testbed_guid in self._testbeds.keys():
279             _guids = self._guids_in_testbed(testbed_guid)
280             if _guids:
281                 guids.extend(_guids)
282         return guids
283
284     def persist_experiment_xml(self):
285         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
286         f = open(xml_path, "w")
287         f.write(self._experiment_design_xml)
288         f.close()
289
290     def persist_execute_xml(self):
291         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
292         f = open(xml_path, "w")
293         f.write(self._experiment_execute_xml)
294         f.close()
295
296     def load_experiment_xml(self):
297         xml_path = os.path.join(self._root_dir, "experiment-design.xml")
298         f = open(xml_path, "r")
299         self._experiment_design_xml = f.read()
300         f.close()
301
302     def load_execute_xml(self):
303         xml_path = os.path.join(self._root_dir, "experiment-execute.xml")
304         f = open(xml_path, "r")
305         self._experiment_execute_xml = f.read()
306         f.close()
307
308     def trace(self, guid, trace_id, attribute='value'):
309         testbed = self._testbed_for_guid(guid)
310         if testbed != None:
311             return testbed.trace(guid, trace_id, attribute)
312         raise RuntimeError("No element exists with guid %d" % guid)    
313
314     def traces_info(self):
315         traces_info = dict()
316         for guid, testbed in self._testbeds.iteritems():
317             tinfo = testbed.traces_info()
318             if tinfo:
319                 traces_info[guid] = testbed.traces_info()
320         return traces_info
321
322     @staticmethod
323     def _parallel(callables):
324         excs = []
325         def wrap(callable):
326             def wrapped(*p, **kw):
327                 try:
328                     callable(*p, **kw)
329                 except:
330                     logging.exception("Exception occurred in asynchronous thread:")
331                     excs.append(sys.exc_info())
332             try:
333                 wrapped = functools.wraps(callable)(wrapped)
334             except:
335                 # functools.partial not wrappable
336                 pass
337             return wrapped
338         threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
339         for thread in threads:
340             thread.start()
341         for thread in threads:
342             thread.join()
343         for exc in excs:
344             eTyp, eVal, eLoc = exc
345             raise eTyp, eVal, eLoc
346
347     def start(self):
348         self._started_time = time.time() 
349         self._start()
350
351     def _start(self, recover = False):
352         parser = XmlExperimentParser()
353         
354         if recover:
355             xml = self._experiment_execute_xml
356         else:
357             xml = self._experiment_design_xml
358         data = parser.from_xml_to_data(xml)
359
360         # instantiate testbed controllers
361         to_recover, to_restart = self._init_testbed_controllers(data, recover)
362         all_restart = set(to_restart)
363         
364         if not recover:
365             # persist testbed connection data, for potential recovery
366             self._persist_testbed_proxies()
367         else:
368             # recover recoverable controllers
369             for guid in to_recover:
370                 try:
371                     self._testbeds[guid].do_setup()
372                     self._testbeds[guid].recover()
373                 except:
374                     self._logger.exception("During recovery of testbed %s", guid)
375                     
376                     # Mark failed
377                     self._failed_testbeds.add(guid)
378     
379         def steps_to_configure(self, allowed_guids):
380             # perform setup in parallel for all test beds,
381             # wait for all threads to finish
382
383             self._logger.debug("ExperimentController: Starting parallel do_setup")
384             self._parallel([testbed.do_setup 
385                             for guid,testbed in self._testbeds.iteritems()
386                             if guid in allowed_guids])
387        
388             # perform create-connect in parallel, wait
389             # (internal connections only)
390             self._logger.debug("ExperimentController: Starting parallel do_create")
391             self._parallel([testbed.do_create
392                             for guid,testbed in self._testbeds.iteritems()
393                             if guid in allowed_guids])
394
395             self._logger.debug("ExperimentController: Starting parallel do_connect_init")
396             self._parallel([testbed.do_connect_init
397                             for guid,testbed in self._testbeds.iteritems()
398                             if guid in allowed_guids])
399
400             self._logger.debug("ExperimentController: Starting parallel do_connect_fin")
401             self._parallel([testbed.do_connect_compl
402                             for guid,testbed in self._testbeds.iteritems()
403                             if guid in allowed_guids])
404
405             self._logger.debug("ExperimentController: Starting parallel do_preconfigure")
406             self._parallel([testbed.do_preconfigure
407                             for guid,testbed in self._testbeds.iteritems()
408                             if guid in allowed_guids])
409             self._clear_caches()
410             
411             # Store testbed order
412             self._testbed_order.append(allowed_guids)
413
414         steps_to_configure(self, to_restart)
415
416         if self._netreffed_testbeds:
417             self._logger.debug("ExperimentController: Resolving netreffed testbeds")
418             # initally resolve netrefs
419             self.do_netrefs(data, fail_if_undefined=False)
420             
421             # rinse and repeat, for netreffed testbeds
422             netreffed_testbeds = set(self._netreffed_testbeds)
423
424             to_recover, to_restart = self._init_testbed_controllers(data, recover)
425             all_restart.update(to_restart)
426             
427             if not recover:
428                 # persist testbed connection data, for potential recovery
429                 self._persist_testbed_proxies()
430             else:
431                 # recover recoverable controllers
432                 for guid in to_recover:
433                     try:
434                         self._testbeds[guid].do_setup()
435                         self._testbeds[guid].recover()
436                     except:
437                         self._logger.exception("During recovery of testbed %s", guid)
438
439                         # Mark failed
440                         self._failed_testbeds.add(guid)
441
442             # configure dependant testbeds
443             steps_to_configure(self, to_restart)
444         
445         all_restart = [ self._testbeds[guid] for guid in all_restart ]
446             
447         # final netref step, fail if anything's left unresolved
448         self._logger.debug("ExperimentController: Resolving do_netrefs")
449         self.do_netrefs(data, fail_if_undefined=False)
450        
451         # Only now, that netref dependencies have been solve, it is safe to
452         # program cross_connections
453         self._logger.debug("ExperimentController: Programming testbed cross-connections")
454         self._program_testbed_cross_connections(data)
455  
456         # perform do_configure in parallel for al testbeds
457         # (it's internal configuration for each)
458         self._logger.debug("ExperimentController: Starting parallel do_configure")
459         self._parallel([testbed.do_configure
460                         for testbed in all_restart])
461
462         self._clear_caches()
463
464         #print >>sys.stderr, "DO IT"
465         #import time
466         #time.sleep(60)
467         
468         # cross-connect (cannot be done in parallel)
469         self._logger.debug("ExperimentController: Starting cross-connect")
470         for guid, testbed in self._testbeds.iteritems():
471             cross_data = self._get_cross_data(guid)
472             testbed.do_cross_connect_init(cross_data)
473         for guid, testbed in self._testbeds.iteritems():
474             cross_data = self._get_cross_data(guid)
475             testbed.do_cross_connect_compl(cross_data)
476        
477         self._clear_caches()
478
479         # Last chance to configure (parallel on all testbeds)
480         self._logger.debug("ExperimentController: Starting parallel do_prestart")
481         self._parallel([testbed.do_prestart
482                         for testbed in all_restart])
483
484         # final netref step, fail if anything's left unresolved
485         self.do_netrefs(data, fail_if_undefined=True)
486  
487         self._clear_caches()
488         
489         if not recover:
490             # update execution xml with execution-specific values
491             # TODO: BUG! BUggy code! cant stand all serializing all attribute values (ej: tun_key which is non ascci)"
492             self._update_execute_xml()
493             self.persist_execute_xml()
494
495         # start experiment (parallel start on all testbeds)
496         self._logger.debug("ExperimentController: Starting parallel do_start")
497         self._parallel([testbed.start
498                         for testbed in all_restart])
499
500         self._clear_caches()
501
502     def _clear_caches(self):
503         # Cleaning cache for safety.
504         self._guids_in_testbed_cache = dict()
505
506     def _persist_testbed_proxies(self):
507         TRANSIENT = (DC.RECOVER,)
508         
509         # persist access configuration for all testbeds, so that
510         # recovery mode can reconnect to them if it becomes necessary
511         conf = ConfigParser.RawConfigParser()
512         for testbed_guid, testbed_config in self._deployment_config.iteritems():
513             testbed_guid = str(testbed_guid)
514             conf.add_section(testbed_guid)
515             for attr in testbed_config.get_attribute_list():
516                 if attr not in TRANSIENT:
517                     value = testbed_config.get_attribute_value(attr)
518                     if value is not None:
519                         conf.set(testbed_guid, attr, value)
520         
521         f = open(os.path.join(self._root_dir, 'deployment_config.ini'), 'w')
522         conf.write(f)
523         f.close()
524     
525     def _load_testbed_proxies(self):
526         TYPEMAP = {
527             Attribute.STRING : 'get',
528             Attribute.BOOL : 'getboolean',
529             Attribute.ENUM : 'get',
530             Attribute.DOUBLE : 'getfloat',
531             Attribute.INTEGER : 'getint',
532         }
533         
534         TRANSIENT = (DC.RECOVER,)
535         
536         # deferred import because proxy needs
537         # our class definitions to define proxies
538         import nepi.util.proxy as proxy
539         
540         conf = ConfigParser.RawConfigParser()
541         conf.read(os.path.join(self._root_dir, 'deployment_config.ini'))
542         for testbed_guid in conf.sections():
543             testbed_config = proxy.AccessConfiguration()
544             testbed_guid = str(testbed_guid)
545             for attr in testbed_config.get_attribute_list():
546                 if attr not in TRANSIENT:
547                     getter = getattr(conf, TYPEMAP.get(
548                         testbed_config.get_attribute_type(attr),
549                         'get') )
550                     try:
551                         value = getter(testbed_guid, attr)
552                         testbed_config.set_attribute_value(attr, value)
553                     except ConfigParser.NoOptionError:
554                         # Leave default
555                         pass
556     
557     def _unpersist_testbed_proxies(self):
558         try:
559             os.remove(os.path.join(self._root_dir, 'deployment_config.ini'))
560         except:
561             # Just print exceptions, this is just cleanup
562             self._logger.exception("Loading testbed configuration")
563
564     def _update_execute_xml(self):
565         # For all testbeds,
566         #   For all elements in testbed,
567         #       - gather immutable execute-readable attribuets lists
568         #         asynchronously
569         # Generate new design description from design xml
570         # (Wait for attributes lists - implicit syncpoint)
571         # For all testbeds,
572         #   For all elements in testbed,
573         #       - gather all immutable execute-readable attribute
574         #         values, asynchronously
575         # (Wait for attribute values - implicit syncpoint)
576         # For all testbeds,
577         #   For all elements in testbed,
578         #       - inject non-None values into new design
579         # Generate execute xml from new design
580
581         attribute_lists = dict(
582             (testbed_guid, collections.defaultdict(dict))
583             for testbed_guid in self._testbeds
584         )
585         
586         for testbed_guid, testbed in self._testbeds.iteritems():
587             guids = self._guids_in_testbed(testbed_guid)
588             for guid in guids:
589                 attribute_lists[testbed_guid][guid] = \
590                     testbed.get_attribute_list_deferred(guid, Attribute.ExecImmutable)
591         
592         parser = XmlExperimentParser()
593         execute_data = parser.from_xml_to_data(self._experiment_design_xml)
594
595         attribute_values = dict(
596             (testbed_guid, collections.defaultdict(dict))
597             for testbed_guid in self._testbeds
598         )
599         
600         for testbed_guid, testbed_attribute_lists in attribute_lists.iteritems():
601             testbed = self._testbeds[testbed_guid]
602             for guid, attribute_list in testbed_attribute_lists.iteritems():
603                 attribute_list = _undefer(attribute_list)
604                 attribute_values[testbed_guid][guid] = dict(
605                     (attribute, testbed.get_deferred(guid, attribute))
606                     for attribute in attribute_list
607                 )
608         
609         for testbed_guid, testbed_attribute_values in attribute_values.iteritems():
610             for guid, attribute_values in testbed_attribute_values.iteritems():
611                 for attribute, value in attribute_values.iteritems():
612                     value = _undefer(value)
613                     if value is not None:
614                         execute_data.add_attribute_data(guid, attribute, value)
615         
616         self._experiment_execute_xml = parser.to_xml(data=execute_data)
617
618     def stop(self):
619        for testbed in self._testbeds.values():
620            testbed.stop()
621        self._unpersist_testbed_proxies()
622        self._stopped_time = time.time() 
623    
624     def recover(self):
625         # reload perviously persisted testbed access configurations
626         self._failed_testbeds.clear()
627         self._load_testbed_proxies()
628
629         # re-program testbeds that need recovery
630         self._start(recover = True)
631
632     def is_finished(self, guid):
633         testbed = self._testbed_for_guid(guid)
634         if testbed != None:
635             return testbed.status(guid) == AS.STATUS_FINISHED
636         raise RuntimeError("No element exists with guid %d" % guid)    
637     
638     def _testbed_recovery_policy(self, guid, data = None):
639         if data is None:
640             parser = XmlExperimentParser()
641             data = parser.from_xml_to_data(self._experiment_design_xml)
642         
643         return data.get_attribute_data(guid, DC.RECOVERY_POLICY)
644
645     def status(self, guid):
646         if guid in self._testbeds:
647             # guid is a testbed
648             # report testbed status
649             if guid in self._failed_testbeds:
650                 return TS.STATUS_FAILED
651             else:
652                 try:
653                     return self._testbeds[guid].status()
654                 except:
655                     return TS.STATUS_UNRESPONSIVE
656         else:
657             # guid is an element
658             testbed = self._testbed_for_guid(guid)
659             if testbed is not None:
660                 return testbed.status(guid)
661             else:
662                 return AS.STATUS_UNDETERMINED
663
664     def set(self, guid, name, value, time = TIME_NOW):
665         testbed = self._testbed_for_guid(guid)
666         if testbed != None:
667             testbed.set(guid, name, value, time)
668         else:
669             raise RuntimeError("No element exists with guid %d" % guid)    
670
671     def get(self, guid, name, time = TIME_NOW):
672         testbed = self._testbed_for_guid(guid)
673         if testbed != None:
674             return testbed.get(guid, name, time)
675         raise RuntimeError("No element exists with guid %d" % guid)    
676
677     def get_deferred(self, guid, name, time = TIME_NOW):
678         testbed = self._testbed_for_guid(guid)
679         if testbed != None:
680             return testbed.get_deferred(guid, name, time)
681         raise RuntimeError("No element exists with guid %d" % guid)    
682
683     def get_factory_id(self, guid):
684         testbed = self._testbed_for_guid(guid)
685         if testbed != None:
686             return testbed.get_factory_id(guid)
687         raise RuntimeError("No element exists with guid %d" % guid)    
688
689     def get_testbed_id(self, guid):
690         testbed = self._testbed_for_guid(guid)
691         if testbed != None:
692             return testbed.testbed_id
693         raise RuntimeError("No element exists with guid %d" % guid)    
694
695     def get_testbed_version(self, guid):
696         testbed = self._testbed_for_guid(guid)
697         if testbed != None:
698             return testbed.testbed_version
699         raise RuntimeError("No element exists with guid %d" % guid)    
700
701     def shutdown(self):
702         exceptions = list()
703         ordered_testbeds = set()
704
705         def shutdown_testbed(guid):
706             try:
707                 testbed = self._testbeds[guid]
708                 ordered_testbeds.add(guid)
709                 testbed.shutdown()
710             except:
711                 exceptions.append(sys.exc_info())
712                 
713         self._logger.debug("ExperimentController: Starting parallel shutdown")
714         
715         for testbed_guids in reversed(self._testbed_order):
716             testbed_guids = set(testbed_guids) - ordered_testbeds
717             self._logger.debug("ExperimentController: Shutting down %r", testbed_guids)
718             self._parallel([functools.partial(shutdown_testbed, guid)
719                             for guid in testbed_guids])
720         remaining_guids = set(self._testbeds) - ordered_testbeds
721         if remaining_guids:
722             self._logger.debug("ExperimentController: Shutted down %r", ordered_testbeds)
723             self._logger.debug("ExperimentController: Shutting down %r", remaining_guids)
724             self._parallel([functools.partial(shutdown_testbed, guid)
725                             for guid in remaining_guids])
726             
727         for exc_info in exceptions:
728             raise exc_info[0], exc_info[1], exc_info[2]
729
730     def _testbed_for_guid(self, guid):
731         for testbed_guid in self._testbeds.keys():
732             if guid in self._guids_in_testbed(testbed_guid):
733                 if testbed_guid in self._failed_testbeds:
734                     return None
735                 return self._testbeds[testbed_guid]
736         return None
737
738     def _guids_in_testbed(self, testbed_guid):
739         if testbed_guid not in self._testbeds:
740             return set()
741         if testbed_guid not in self._guids_in_testbed_cache:
742             self._guids_in_testbed_cache[testbed_guid] = \
743                 set(self._testbeds[testbed_guid].guids)
744         return self._guids_in_testbed_cache[testbed_guid]
745
746     @staticmethod
747     def _netref_component_split(component):
748         match = COMPONENT_PATTERN.match(component)
749         if match:
750             return match.group("kind"), match.group("index")
751         else:
752             return component, None
753
754     _NETREF_COMPONENT_GETTERS = {
755         'addr':
756             lambda testbed, guid, index, name: 
757                 testbed.get_address(guid, int(index), name),
758         'route' :
759             lambda testbed, guid, index, name: 
760                 testbed.get_route(guid, int(index), name),
761         'trace' :
762             lambda testbed, guid, index, name: 
763                 testbed.trace(guid, index, attribute = name),
764         '' : 
765             lambda testbed, guid, index, name: 
766                 testbed.get(guid, name),
767     }
768     
769     def resolve_netref_value(self, value, failval = None):
770         rv = failval
771         while True:
772             for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
773                 label = match.group("label")
774                 if label.startswith('GUID-'):
775                     ref_guid = int(label[5:])
776                     if ref_guid:
777                         expr = match.group("expr")
778                         component = (match.group("component") or "")[1:] # skip the dot
779                         attribute = match.group("attribute")
780                         
781                         # split compound components into component kind and index
782                         # eg: 'addr[0]' -> ('addr', '0')
783                         component, component_index = self._netref_component_split(component)
784
785                         # find object and resolve expression
786                         for ref_testbed_guid, ref_testbed in self._testbeds.iteritems():
787                             if component not in self._NETREF_COMPONENT_GETTERS:
788                                 raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
789                             elif ref_guid not in self._guids_in_testbed(ref_testbed_guid):
790                                 pass
791                             else:
792                                 ref_value = self._NETREF_COMPONENT_GETTERS[component](
793                                     ref_testbed, ref_guid, component_index, attribute)
794                                 if ref_value:
795                                     value = rv = value.replace(match.group(), ref_value)
796                                     break
797                         else:
798                             # unresolvable netref
799                             return failval
800                         break
801             else:
802                 break
803         return rv
804     
805     def do_netrefs(self, data, fail_if_undefined = False):
806         # element netrefs
807         for (testbed_guid, guid), attrs in self._netrefs.items():
808             testbed = self._testbeds.get(testbed_guid)
809             if testbed is not None:
810                 for name in set(attrs):
811                     value = testbed.get(guid, name)
812                     if isinstance(value, basestring):
813                         ref_value = self.resolve_netref_value(value)
814                         if ref_value is not None:
815                             testbed.set(guid, name, ref_value)
816                             attrs.remove(name)
817                         elif fail_if_undefined:
818                             raise ValueError, "Unresolvable netref in: %r=%r" % (name,value,)
819                 if not attrs:
820                     del self._netrefs[(testbed_guid, guid)]
821         
822         # testbed netrefs
823         for testbed_guid, attrs in self._testbed_netrefs.items():
824             tb_data = dict(data.get_attribute_data(testbed_guid))
825             if data:
826                 for name in set(attrs):
827                     value = tb_data.get(name)
828                     if isinstance(value, basestring):
829                         ref_value = self.resolve_netref_value(value)
830                         if ref_value is not None:
831                             data.set_attribute_data(testbed_guid, name, ref_value)
832                             attrs.remove(name)
833                         elif fail_if_undefined:
834                             raise ValueError, "Unresolvable netref in: %r" % (value,)
835                 if not attrs:
836                     del self._testbed_netrefs[testbed_guid]
837         
838
839     def _init_testbed_controllers(self, data, recover = False):
840         blacklist_testbeds = set(self._testbeds)
841         element_guids = list()
842         label_guids = dict()
843         data_guids = data.guids
844         to_recover = set()
845         to_restart = set()
846
847         # gather label associations
848         for guid in data_guids:
849             if not data.is_testbed_data(guid):
850                 (testbed_guid, factory_id) = data.get_box_data(guid)
851                 label = data.get_attribute_data(guid, "label")
852                 if label is not None:
853                     if label in label_guids:
854                         raise RuntimeError, "Label %r is not unique" % (label,)
855                     label_guids[label] = guid
856
857         # create testbed controllers
858         for guid in data_guids:
859             if data.is_testbed_data(guid):
860                 if guid not in self._testbeds:
861                     try:
862                         self._create_testbed_controller(
863                             guid, data, element_guids, recover)
864                         if recover:
865                             # Already programmed
866                             blacklist_testbeds.add(guid)
867                         else:
868                             to_restart.add(guid)
869                     except:
870                         if recover:
871                             policy = self._testbed_recovery_policy(guid, data=data)
872                             if policy == DC.POLICY_RECOVER:
873                                 self._create_testbed_controller(
874                                     guid, data, element_guids, False)
875                                 to_recover.add(guid)
876                             elif policy == DC.POLICY_RESTART:
877                                 self._create_testbed_controller(
878                                     guid, data, element_guids, False)
879                                 to_restart.add(guid)
880                             else:
881                                 # Mark failed
882                                 self._failed_testbeds.add(guid)
883                         else:
884                             raise
885         
886         # queue programmable elements
887         #  - that have not been programmed already (blacklist_testbeds)
888         #  - including recovered or restarted testbeds
889         #  - but those that have no unresolved netrefs
890         for guid in data_guids:
891             if not data.is_testbed_data(guid):
892                 (testbed_guid, factory_id) = data.get_box_data(guid)
893                 if testbed_guid not in blacklist_testbeds:
894                     element_guids.append(guid)
895
896         # replace references to elements labels for its guid
897         self._resolve_labels(data, data_guids, label_guids)
898     
899         # program testbed controllers
900         if element_guids:
901             self._program_testbed_controllers(element_guids, data)
902         
903         return to_recover, to_restart
904
905     def _resolve_labels(self, data, data_guids, label_guids):
906         netrefs = self._netrefs
907         testbed_netrefs = self._testbed_netrefs
908         for guid in data_guids:
909             for name, value in data.get_attribute_data(guid):
910                 if isinstance(value, basestring):
911                     while True:
912                         for match in ATTRIBUTE_PATTERN_BASE.finditer(value):
913                             label = match.group("label")
914                             if not label.startswith('GUID-'):
915                                 ref_guid = label_guids.get(label)
916                                 if ref_guid is not None:
917                                     value = value.replace(
918                                         match.group(),
919                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
920                                             guid = 'GUID-%d' % (ref_guid,),
921                                             expr = match.group("expr"),
922                                             label = label)
923                                     )
924                                     data.set_attribute_data(guid, name, value)
925                                     
926                                     # memorize which guid-attribute pairs require
927                                     # postprocessing, to avoid excessive controller-testbed
928                                     # communication at configuration time
929                                     # (which could require high-latency network I/O)
930                                     if not data.is_testbed_data(guid):
931                                         (testbed_guid, factory_id) = data.get_box_data(guid)
932                                         netrefs[(testbed_guid, guid)].add(name)
933                                     else:
934                                         testbed_netrefs[guid].add(name)
935                                     
936                                     break
937                         else:
938                             break
939
940     def _create_testbed_controller(self, guid, data, element_guids, recover):
941         (testbed_id, testbed_version) = data.get_testbed_data(guid)
942         deployment_config = self._deployment_config.get(guid)
943         
944         # deferred import because proxy needs
945         # our class definitions to define proxies
946         import nepi.util.proxy as proxy
947         
948         if deployment_config is None:
949             # need to create one
950             deployment_config = proxy.AccessConfiguration()
951             
952             for (name, value) in data.get_attribute_data(guid):
953                 if value is not None and deployment_config.has_attribute(name):
954                     # if any deployment config attribute has a netref, we can't
955                     # create this controller yet
956                     if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
957                         # remember to re-issue this one
958                         self._netreffed_testbeds.add(guid)
959                         return
960                     
961                     # copy deployment config attribute
962                     deployment_config.set_attribute_value(name, value)
963             
964             # commit config
965             self._deployment_config[guid] = deployment_config
966         
967         if deployment_config is not None:
968             # force recovery mode 
969             deployment_config.set_attribute_value("recover",recover)
970         
971         testbed = proxy.create_testbed_controller(testbed_id, testbed_version,
972                 deployment_config)
973         for (name, value) in data.get_attribute_data(guid):
974             testbed.defer_configure(name, value)
975         self._testbeds[guid] = testbed
976         if guid in self._netreffed_testbeds:
977             self._netreffed_testbeds.remove(guid)
978
979     def _program_testbed_controllers(self, element_guids, data):
980         def resolve_create_netref(data, guid, name, value): 
981             # Try to resolve create-time netrefs, if possible
982             if isinstance(value, basestring) and ATTRIBUTE_PATTERN_BASE.search(value):
983                 try:
984                     nuvalue = self.resolve_netref_value(value)
985                 except:
986                     # Any trouble means we're not in shape to resolve the netref yet
987                     nuvalue = None
988                 if nuvalue is not None:
989                     # Only if we succeed we remove the netref deferral entry
990                     value = nuvalue
991                     data.set_attribute_data(guid, name, value)
992                     if (testbed_guid, guid) in self._netrefs:
993                         self._netrefs[(testbed_guid, guid)].discard(name)
994             return value
995
996         for guid in element_guids:
997             (testbed_guid, factory_id) = data.get_box_data(guid)
998             testbed = self._testbeds.get(testbed_guid)
999             if testbed is not None:
1000                 # create
1001                 testbed.defer_create(guid, factory_id)
1002                 # set attributes
1003                 for (name, value) in data.get_attribute_data(guid):
1004                     value = resolve_create_netref(data, guid, name, value)
1005                     testbed.defer_create_set(guid, name, value)
1006
1007         for guid in element_guids:
1008             (testbed_guid, factory_id) = data.get_box_data(guid)
1009             testbed = self._testbeds.get(testbed_guid)
1010             if testbed is not None:
1011                 # traces
1012                 for trace_id in data.get_trace_data(guid):
1013                     testbed.defer_add_trace(guid, trace_id)
1014                 # addresses
1015                 for (address, netprefix, broadcast) in data.get_address_data(guid):
1016                     if address != None:
1017                         testbed.defer_add_address(guid, address, netprefix, 
1018                                 broadcast)
1019                 # routes
1020                 for (destination, netprefix, nexthop, metric) in data.get_route_data(guid):
1021                     testbed.defer_add_route(guid, destination, netprefix, nexthop, metric)
1022                 # store connections data
1023                 for (connector_type_name, other_guid, other_connector_type_name) \
1024                         in data.get_connection_data(guid):
1025                     (other_testbed_guid, other_factory_id) = data.get_box_data(
1026                             other_guid)
1027                     if testbed_guid == other_testbed_guid:
1028                         # each testbed should take care of enforcing internal
1029                         # connection simmetry, so each connection is only
1030                         # added in one direction
1031                         testbed.defer_connect(guid, connector_type_name, 
1032                                 other_guid, other_connector_type_name)
1033
1034     def _program_testbed_cross_connections(self, data):
1035         data_guids = data.guids
1036         for guid in data_guids: 
1037             if not data.is_testbed_data(guid):
1038                 (testbed_guid, factory_id) = data.get_box_data(guid)
1039                 testbed = self._testbeds.get(testbed_guid)
1040                 if testbed is not None:
1041                     for (connector_type_name, cross_guid, cross_connector_type_name) \
1042                             in data.get_connection_data(guid):
1043                         (testbed_guid, factory_id) = data.get_box_data(guid)
1044                         (cross_testbed_guid, cross_factory_id) = data.get_box_data(
1045                                 cross_guid)
1046                         if testbed_guid != cross_testbed_guid:
1047                             cross_testbed = self._testbeds[cross_testbed_guid]
1048                             cross_testbed_id = cross_testbed.testbed_id
1049                             testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
1050                                     cross_testbed_guid, cross_testbed_id, cross_factory_id, 
1051                                     cross_connector_type_name)
1052                             # save cross data for later
1053                             self._logger.debug("ExperimentController: adding cross_connection data tbd=%d:guid=%d - tbd=%d:guid=%d" % \
1054                                     (testbed_guid, guid, cross_testbed_guid, cross_guid))
1055                             self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
1056                                     cross_guid)
1057
1058     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
1059         if testbed_guid not in self._cross_data:
1060             self._cross_data[testbed_guid] = dict()
1061         if cross_testbed_guid not in self._cross_data[testbed_guid]:
1062             self._cross_data[testbed_guid][cross_testbed_guid] = set()
1063         self._cross_data[testbed_guid][cross_testbed_guid].add(cross_guid)
1064
1065     def _get_cross_data(self, testbed_guid):
1066         cross_data = dict()
1067         if not testbed_guid in self._cross_data:
1068             return cross_data
1069
1070         # fetch attribute lists in one batch
1071         attribute_lists = dict()
1072         for cross_testbed_guid, guid_list in \
1073                 self._cross_data[testbed_guid].iteritems():
1074             cross_testbed = self._testbeds[cross_testbed_guid]
1075             for cross_guid in guid_list:
1076                 attribute_lists[(cross_testbed_guid, cross_guid)] = \
1077                     cross_testbed.get_attribute_list_deferred(cross_guid)
1078
1079         # fetch attribute values in another batch
1080         for cross_testbed_guid, guid_list in \
1081                 self._cross_data[testbed_guid].iteritems():
1082             cross_data[cross_testbed_guid] = dict()
1083             cross_testbed = self._testbeds[cross_testbed_guid]
1084             for cross_guid in guid_list:
1085                 elem_cross_data = dict(
1086                     _guid = cross_guid,
1087                     _testbed_guid = cross_testbed_guid,
1088                     _testbed_id = cross_testbed.testbed_id,
1089                     _testbed_version = cross_testbed.testbed_version)
1090                 cross_data[cross_testbed_guid][cross_guid] = elem_cross_data
1091                 attribute_list = attribute_lists[(cross_testbed_guid,cross_guid)]
1092                 for attr_name in attribute_list:
1093                     attr_value = cross_testbed.get_deferred(cross_guid, attr_name)
1094                     elem_cross_data[attr_name] = attr_value
1095         
1096         # undefer all values - we'll have to serialize them probably later
1097         for cross_testbed_guid, testbed_cross_data in cross_data.iteritems():
1098             for cross_guid, elem_cross_data in testbed_cross_data.iteritems():
1099                 for attr_name, attr_value in elem_cross_data.iteritems():
1100                     elem_cross_data[attr_name] = _undefer(attr_value)
1101         
1102         return cross_data
1103
1104 class ExperimentSuite(object):
1105     def __init__(self, experiment_xml, access_config, repetitions = None,
1106             duration = None, wait_guids = None):
1107         self._experiment_xml = experiment_xml
1108         self._access_config = access_config
1109         self._controllers = dict()
1110         self._access_configs = dict()
1111         self._repetitions = 1 if not repetitions else repetitions
1112         self._duration = duration
1113         self._wait_guids = wait_guids
1114         self._current = None
1115         self._status = TS.STATUS_ZERO
1116         self._thread = None
1117
1118     def current(self):
1119         return self._current
1120
1121     def status(self):
1122         return self._status
1123
1124     def is_finished(self):
1125         return self._status == TS.STATUS_STOPPED
1126
1127     def get_access_configurations(self):
1128         return self._access_configs.values()
1129
1130     def start(self):
1131         self._status  = TS.STATUS_STARTED
1132         self._thread = threading.Thread(target = self._run_experiment_suite)
1133         self._thread.start()
1134
1135     def shutdown(self):
1136         if self._thread:
1137             self._thread.join()
1138             self._thread = None
1139         for controller in self._controllers.values():
1140             controller.shutdown()
1141
1142     def get_current_access_config(self):
1143         return self._access_configs[self._current]
1144
1145     def _run_experiment_suite(self):
1146         for i in xrange(1, self._repetitions):
1147             self._current = i
1148             self._run_one_experiment()
1149         self._status = TS.STATUS_STOPPED
1150
1151     def _run_one_experiment(self):
1152         from nepi.util import proxy
1153         access_config = proxy.AccessConfiguration()
1154         for attr in self._access_config.attributes:
1155             if attr.value:
1156                 access_config.set_attribute_value(attr.name, attr.value)
1157         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
1158         root_dir = "%s_%d" % (
1159                 access_config.get_attribute_value(DC.ROOT_DIRECTORY), 
1160                 self._current)
1161         access_config.set_attribute_value(DC.ROOT_DIRECTORY, root_dir)
1162         controller = proxy.create_experiment_controller(self._experiment_xml,
1163                 access_config)
1164         self._access_configs[self._current] = access_config
1165         self._controllers[self._current] = controller
1166         controller.start()
1167         started_at = time.time()
1168         # wait until all specified guids have finished execution
1169         if self._wait_guids:
1170             while all(itertools.imap(controller.is_finished, self._wait_guids)):
1171                 time.sleep(0.5)
1172         # wait until the minimum experiment duration time has elapsed 
1173         if self._duration:
1174             while (time.time() - started_at) < self._duration:
1175                 time.sleep(0.5)
1176         controller.stop()
1177