Ticket #28: Refactor Box classes to use mixins, and provide read-only routes/addesses
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import proxy, validation
6 from nepi.util.constants import STATUS_FINISHED, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13
14 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
15 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
16 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
17
18 class ConnectorType(object):
19     def __init__(self, testbed_id, factory_id, name, max = -1, min = 0):
20         super(ConnectorType, self).__init__()
21         if max == -1:
22             max = sys.maxint
23         elif max <= 0:
24                 raise RuntimeError(
25              "The maximum number of connections allowed need to be more than 0")
26         if min < 0:
27             raise RuntimeError(
28              "The minimum number of connections allowed needs to be at least 0")
29         # connector_type_id -- univoquely identifies a connector type 
30         # across testbeds
31         self._connector_type_id = (testbed_id.lower(), factory_id.lower(), 
32                 name.lower())
33         # name -- display name for the connector type
34         self._name = name
35         # max -- maximum amount of connections that this type support, 
36         # -1 for no limit
37         self._max = max
38         # min -- minimum amount of connections required by this type of connector
39         self._min = min
40         # from_connections -- connections where the other connector is the "From"
41         # to_connections -- connections where the other connector is the "To"
42         # keys in the dictionary correspond to the 
43         # connector_type_id for possible connections. The value is a tuple:
44         # (can_cross, connect)
45         # can_cross: indicates if the connection is allowed accros different
46         #    testbed instances
47         # code: is the connection function to be invoked when the elements
48         #    are connected
49         self._from_connections = dict()
50         self._to_connections = dict()
51
52     @property
53     def connector_type_id(self):
54         return self._connector_type_id
55
56     @property
57     def name(self):
58         return self._name
59
60     @property
61     def max(self):
62         return self._max
63
64     @property
65     def min(self):
66         return self._min
67
68     def add_from_connection(self, testbed_id, factory_id, name, can_cross, code):
69         self._from_connections[(testbed_id.lower(), factory_id.lower(),
70             name.lower())] = (can_cross, code)
71
72     def add_to_connection(self, testbed_id, factory_id, name, can_cross, code):
73         self._to_connections[(testbed_id.lower(), factory_id.lower(), 
74             name.lower())] = (can_cross, code)
75
76     def can_connect(self, testbed_id, factory_id, name, count, 
77             must_cross = False):
78         connector_type_id = (testbed_id.lower(), factory_id.lower(),
79             name.lower())
80         if connector_type_id in self._from_connections:
81             (can_cross, code) = self._from_connections[connector_type_id]
82         elif connector_type_id in self._to_connections:
83             (can_cross, code) = self._to_connections[connector_type_id]
84         else:
85             return False
86         return not must_cross or can_cross
87
88     def code_to_connect(self, testbed_id, factory_id, name):
89         connector_type_id = (testbed_id.lower(), factory_id.lower(), 
90             name.lower())        
91         if not connector_type_id in self._to_connections.keys():
92             return False
93         (can_cross, code) = self._to_connections[connector_type_id]
94         return code
95
96 # TODO: create_function, start_function, stop_function, status_function 
97 # need a definition!
98 class Factory(AttributesMap):
99     def __init__(self, factory_id, create_function, start_function, 
100             stop_function, status_function, 
101             configure_function, preconfigure_function,
102             allow_addresses = False, has_addresses = False,
103             allow_routes = False, has_routes = False):
104         super(Factory, self).__init__()
105         self._factory_id = factory_id
106         self._allow_addresses = bool(allow_addresses)
107         self._allow_routes = bool(allow_routes)
108         self._has_addresses = bool(has_addresses) or self._allow_addresses
109         self._has_routes = bool(has_routes) or self._allow_routes
110         self._create_function = create_function
111         self._start_function = start_function
112         self._stop_function = stop_function
113         self._status_function = status_function
114         self._configure_function = configure_function
115         self._preconfigure_function = preconfigure_function
116         self._connector_types = dict()
117         self._traces = list()
118         self._box_attributes = AttributesMap()
119
120     @property
121     def factory_id(self):
122         return self._factory_id
123
124     @property
125     def allow_addresses(self):
126         return self._allow_addresses
127
128     @property
129     def allow_routes(self):
130         return self._allow_routes
131
132     @property
133     def has_addresses(self):
134         return self._has_addresses
135
136     @property
137     def has_routes(self):
138         return self._has_routes
139
140     @property
141     def box_attributes(self):
142         return self._box_attributes
143
144     @property
145     def create_function(self):
146         return self._create_function
147
148     @property
149     def start_function(self):
150         return self._start_function
151
152     @property
153     def stop_function(self):
154         return self._stop_function
155
156     @property
157     def status_function(self):
158         return self._status_function
159
160     @property
161     def configure_function(self):
162         return self._configure_function
163
164     @property
165     def preconfigure_function(self):
166         return self._preconfigure_function
167
168     @property
169     def traces(self):
170         return self._traces
171
172     def connector_type(self, name):
173         return self._connector_types[name]
174
175     def add_connector_type(self, connector_type):
176         self._connector_types[connector_type.name] = connector_type
177
178     def add_trace(self, trace_id):
179         self._traces.append(trace_id)
180
181     def add_box_attribute(self, name, help, type, value = None, range = None,
182         allowed = None, flags = Attribute.NoFlags, validation_function = None):
183         self._box_attributes.add_attribute(name, help, type, value, range, 
184                 allowed, flags, validation_function)
185
186 class TestbedController(object):
187     def __init__(self, testbed_id, testbed_version):
188         self._testbed_id = testbed_id
189         self._testbed_version = testbed_version
190
191     @property
192     def guids(self):
193         raise NotImplementedError
194
195     def defer_configure(self, name, value):
196         """Instructs setting a configuartion attribute for the testbed instance"""
197         raise NotImplementedError
198
199     def defer_create(self, guid, factory_id):
200         """Instructs creation of element """
201         raise NotImplementedError
202
203     def defer_create_set(self, guid, name, value):
204         """Instructs setting an initial attribute on an element"""
205         raise NotImplementedError
206
207     def defer_factory_set(self, guid, name, value):
208         """Instructs setting an attribute on a factory"""
209         raise NotImplementedError
210
211     def defer_connect(self, guid1, connector_type_name1, guid2, 
212             connector_type_name2): 
213         """Instructs creation of a connection between the given connectors"""
214         raise NotImplementedError
215
216     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
217             cross_testbed_id, cross_factory_id, cross_connector_type_name):
218         """
219         Instructs creation of a connection between the given connectors 
220         of different testbed instances
221         """
222         raise NotImplementedError
223
224     def defer_add_trace(self, guid, trace_id):
225         """Instructs the addition of a trace"""
226         raise NotImplementedError
227
228     def defer_add_address(self, guid, address, netprefix, broadcast): 
229         """Instructs the addition of an address"""
230         raise NotImplementedError
231
232     def defer_add_route(self, guid, destination, netprefix, nexthop):
233         """Instructs the addition of a route"""
234         raise NotImplementedError
235
236     def do_setup(self):
237         """After do_setup the testbed initial configuration is done"""
238         raise NotImplementedError
239
240     def do_create(self):
241         """
242         After do_create all instructed elements are created and 
243         attributes setted
244         """
245         raise NotImplementedError
246
247     def do_connect(self):
248         """
249         After do_connect all internal connections between testbed elements
250         are done
251         """
252         raise NotImplementedError
253
254     def do_configure(self):
255         """After do_configure elements are configured"""
256         raise NotImplementedError
257
258     def do_cross_connect(self):
259         """
260         After do_cross_connect all external connections between different testbed 
261         elements are done
262         """
263         raise NotImplementedError
264
265     def start(self):
266         raise NotImplementedError
267
268     def stop(self):
269         raise NotImplementedError
270
271     def set(self, time, guid, name, value):
272         raise NotImplementedError
273
274     def get(self, time, guid, name):
275         raise NotImplementedError
276     
277     def get_route(self, guid, index, attribute):
278         """
279         Params:
280             
281             guid: guid of box to query
282             index: number of routing entry to fetch
283             attribute: one of Destination, NextHop, NetPrefix
284         """
285         raise NotImplementedError
286
287     def get_address(self, guid, index, attribute='Address'):
288         """
289         Params:
290             
291             guid: guid of box to query
292             index: number of inteface to select
293             attribute: one of Address, NetPrefix, Broadcast
294         """
295         raise NotImplementedError
296
297     def action(self, time, guid, action):
298         raise NotImplementedError
299
300     def status(self, guid):
301         raise NotImplementedError
302
303     def trace(self, guid, trace_id, attribute='value'):
304         raise NotImplementedError
305
306     def shutdown(self):
307         raise NotImplementedError
308
309 class ExperimentController(object):
310     def __init__(self, experiment_xml, root_dir):
311         self._experiment_xml = experiment_xml
312         self._testbeds = dict()
313         self._access_config = dict()
314         self._netrefs = dict()
315         self._crossdata = dict()
316         self._root_dir = root_dir
317
318         self.persist_experiment_xml()
319
320     @property
321     def experiment_xml(self):
322         return self._experiment_xml
323
324     def persist_experiment_xml(self):
325         xml_path = os.path.join(self._root_dir, "experiment.xml")
326         f = open(xml_path, "w")
327         f.write(self._experiment_xml)
328         f.close()
329
330     def set_access_configuration(self, testbed_guid, access_config):
331         self._access_config[testbed_guid] = access_config
332
333     def trace(self, testbed_guid, guid, trace_id, attribute='value'):
334         return self._testbeds[testbed_guid].trace(guid, trace_id, attribute)
335
336     @staticmethod
337     def _parallel(callables):
338         threads = [ threading.Thread(target=callable) for callable in callables ]
339         for thread in threads:
340             thread.start()
341         for thread in threads:
342             thread.join()
343
344     def start(self):
345         self._init_testbed_controllers()
346         
347         # persist testbed connection data, for potential recovery
348         self._persist_testbed_proxies()
349         
350         # perform setup in parallel for all test beds,
351         # wait for all threads to finish
352         self._parallel([testbed.do_setup 
353                         for testbed in self._testbeds.itervalues()])
354         
355         # perform create-connect in parallel, wait
356         # (internal connections only)
357         self._parallel([lambda : (testbed.do_create(), 
358                                   testbed.do_connect(),
359                                   testbed.do_preconfigure())
360                         for testbed in self._testbeds.itervalues()])
361         
362         # resolve netrefs
363         self.do_netrefs(fail_if_undefined=True)
364         
365         # perform do_configure in parallel for al testbeds
366         # (it's internal configuration for each)
367         self._parallel([testbed.do_configure
368                         for testbed in self._testbeds.itervalues()])
369
370         # cross-connect (cannot be done in parallel)
371         for testbed in self._testbeds.values():
372             testbed.do_cross_connect()
373         
374         # start experiment (parallel start on all testbeds)
375         self._parallel([testbed.start
376                         for testbed in self._testbeds.itervalues()])
377
378     def _persist_testbed_proxies(self):
379         TRANSIENT = ('Recover',)
380         
381         # persist access configuration for all testbeds, so that
382         # recovery mode can reconnect to them if it becomes necessary
383         conf = ConfigParser.RawConfigParser()
384         for testbed_guid, testbed_config in self._access_config.iteritems():
385             testbed_guid = str(testbed_guid)
386             conf.add_section(testbed_guid)
387             for attr in testbed_config.attributes_name:
388                 if attr not in TRANSIENT:
389                     conf.set(testbed_guid, attr, 
390                         testbed_config.get_attribute_value(attr))
391         
392         f = open(os.path.join(self._root_dir, 'access_config.ini'), 'w')
393         conf.write(f)
394         f.close()
395     
396     def _load_testbed_proxies(self):
397         TYPEMAP = {
398             STRING : 'get',
399             INTEGER : 'getint',
400             FLOAT : 'getfloat',
401             BOOLEAN : 'getboolean',
402         }
403         
404         conf = ConfigParser.RawConfigParser()
405         conf.read(os.path.join(self._root_dir, 'access_config.ini'))
406         for testbed_guid in conf.sections():
407             testbed_config = proxy.AccessConfiguration()
408             for attr in conf.options(testbed_guid):
409                 testbed_config.set_attribute_value(attr, 
410                     conf.get(testbed_guid, attr) )
411                 
412             testbed_guid = str(testbed_guid)
413             conf.add_section(testbed_guid)
414             for attr in testbed_config.attributes_name:
415                 if attr not in TRANSIENT:
416                     getter = getattr(conf, TYPEMAP.get(
417                         testbed_config.get_attribute_type(attr),
418                         'get') )
419                     testbed_config.set_attribute_value(
420                         testbed_guid, attr, getter(attr))
421     
422     def _unpersist_testbed_proxies(self):
423         try:
424             os.remove(os.path.join(self._root_dir, 'access_config.ini'))
425         except:
426             # Just print exceptions, this is just cleanup
427             import traceback
428             traceback.print_exc(file=sys.stderr)
429
430     def stop(self):
431        for testbed in self._testbeds.values():
432            testbed.stop()
433        self._unpersist_testbed_proxies()
434    
435     def recover(self):
436         # reload perviously persisted testbed access configurations
437         self._load_testbed_proxies()
438         
439         # recreate testbed proxies by reconnecting only
440         self._init_testbed_controllers(recover = True)
441
442     def is_finished(self, guid):
443         for testbed in self._testbeds.values():
444             for guid_ in testbed.guids:
445                 if guid_ == guid:
446                     return testbed.status(guid) == STATUS_FINISHED
447         raise RuntimeError("No element exists with guid %d" % guid)    
448
449     def shutdown(self):
450        for testbed in self._testbeds.values():
451            testbed.shutdown()
452
453     @staticmethod
454     def _netref_component_split(component):
455         match = COMPONENT_PATTERN.match(component)
456         if match:
457             return match.group("kind"), match.group("index")
458         else:
459             return component, None
460
461     def do_netrefs(self, fail_if_undefined = False):
462         COMPONENT_GETTERS = {
463             'addr' :
464                 lambda testbed, guid, index, name : 
465                     testbed.get_address(guid, index, name),
466             'route' :
467                 lambda testbed, guid, index, name : 
468                     testbed.get_route(guid, index, name),
469             'trace' :
470                 lambda testbed, guid, index, name : 
471                     testbed.trace(guid, index, name),
472             '' : 
473                 lambda testbed, guid, index, name : 
474                     testbed.get(TIME_NOW, guid, name),
475         }
476         
477         for (testbed_guid, guid), attrs in self._netrefs.iteritems():
478             testbed = self._testbeds[testbed_guid]
479             for name in attrs:
480                 value = testbed.get(TIME_NOW, guid, name)
481                 if isinstance(value, basestring):
482                     match = ATTRIBUTE_PATTERN_BASE.search(value)
483                     if match:
484                         label = match.group("label")
485                         if label.startswith('GUID-'):
486                             ref_guid = int(label[5:])
487                             if ref_guid:
488                                 expr = match.group("expr")
489                                 component = match.group("component")[1:] # skip the dot
490                                 attribute = match.group("attribute")
491                                 
492                                 # split compound components into component kind and index
493                                 # eg: 'addr[0]' -> ('addr', '0')
494                                 component, component_index = self._netref_component_split(component)
495                                 
496                                 # find object and resolve expression
497                                 for ref_testbed in self._testbeds.itervalues():
498                                     if component not in COMPONENT_GETTERS:
499                                         raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
500                                     else:
501                                         ref_value = COMPONENT_GETTERS[component](
502                                             ref_testbed, ref_guid, component_index, attribute)
503                                         if ref_value:
504                                             testbed.set(TIME_NOW, guid, name, 
505                                                 value.replace(match.group(), ref_value))
506                                             break
507                                 else:
508                                     # couldn't find value
509                                     if fail_if_undefined:
510                                         raise ValueError, "Unresolvable GUID: %r, in netref: %r" % (ref_guid, expr)
511
512     def _init_testbed_controllers(self, recover = False):
513         parser = XmlExperimentParser()
514         data = parser.from_xml_to_data(self._experiment_xml)
515         element_guids = list()
516         label_guids = dict()
517         data_guids = data.guids
518
519         # create testbed controllers
520         for guid in data_guids:
521             if data.is_testbed_data(guid):
522                 self._create_testbed_controller(guid, data, element_guids, 
523                         recover)
524             else:
525                 element_guids.append(guid)
526                 label = data.get_attribute_data(guid, "label")
527                 if label is not None:
528                     if label in label_guids:
529                         raise RuntimeError, "Label %r is not unique" % (label,)
530                     label_guids[label] = guid
531
532         # replace references to elements labels for its guid
533         self._resolve_labels(data, data_guids, label_guids)
534     
535         # program testbed controllers
536         if not recover:
537             self._program_testbed_controllers(element_guids, data)
538
539     def _resolve_labels(self, data, data_guids, label_guids):
540         netrefs = self._netrefs
541         for guid in data_guids:
542             if not data.is_testbed_data(guid):
543                 for name, value in data.get_attribute_data(guid):
544                     if isinstance(value, basestring):
545                         match = ATTRIBUTE_PATTERN_BASE.search(value)
546                         if match:
547                             label = match.group("label")
548                             if not label.startswith('GUID-'):
549                                 ref_guid = label_guids.get(label)
550                                 if ref_guid is not None:
551                                     value = ATTRIBUTE_PATTERN_BASE.sub(
552                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
553                                             guid = 'GUID-%d' % (ref_guid,),
554                                             expr = match.group("expr"),
555                                             label = label), 
556                                         value)
557                                     data.set_attribute_data(guid, name, value)
558                                     
559                                     # memorize which guid-attribute pairs require
560                                     # postprocessing, to avoid excessive controller-testbed
561                                     # communication at configuration time
562                                     # (which could require high-latency network I/O)
563                                     (testbed_guid, factory_id) = data.get_box_data(guid)
564                                     netrefs.setdefault((testbed_guid, guid), set()).add(name)
565
566     def _create_testbed_controller(self, guid, data, element_guids, recover):
567         (testbed_id, testbed_version) = data.get_testbed_data(guid)
568         access_config = None if guid not in self._access_config else\
569                 self._access_config[guid]
570         
571         if recover and access_config is None:
572             # need to create one
573             access_config = self._access_config[guid] = proxy.AccessConfiguration()
574         if access_config is not None:
575             # force recovery mode 
576             access_config.set_attribute_value("recover",recover)
577         
578         testbed = proxy.create_testbed_controller(testbed_id, 
579                 testbed_version, access_config)
580         for (name, value) in data.get_attribute_data(guid):
581             testbed.defer_configure(name, value)
582         self._testbeds[guid] = testbed
583
584     def _program_testbed_controllers(self, element_guids, data):
585         for guid in element_guids:
586             (testbed_guid, factory_id) = data.get_box_data(guid)
587             testbed = self._testbeds[testbed_guid]
588             testbed.defer_create(guid, factory_id)
589             for (name, value) in data.get_attribute_data(guid):
590                 testbed.defer_create_set(guid, name, value)
591
592         for guid in element_guids: 
593             (testbed_guid, factory_id) = data.get_box_data(guid)
594             testbed = self._testbeds[testbed_guid]
595             for (connector_type_name, other_guid, other_connector_type_name) \
596                     in data.get_connection_data(guid):
597                 (testbed_guid, factory_id) = data.get_box_data(guid)
598                 (other_testbed_guid, other_factory_id) = data.get_box_data(
599                         other_guid)
600                 if testbed_guid == other_testbed_guid:
601                     testbed.defer_connect(guid, connector_type_name, other_guid, 
602                         other_connector_type_name)
603                 else:
604                     testbed.defer_cross_connect(guid, connector_type_name, other_guid, 
605                         other_testbed_id, other_factory_id, other_connector_type_name)
606             for trace_id in data.get_trace_data(guid):
607                 testbed.defer_add_trace(guid, trace_id)
608             for (autoconf, address, netprefix, broadcast) in \
609                     data.get_address_data(guid):
610                 if address != None:
611                     testbed.defer_add_address(guid, address, netprefix, broadcast)
612             for (destination, netprefix, nexthop) in data.get_route_data(guid):
613                 testbed.defer_add_route(guid, destination, netprefix, nexthop)
614