Ticket #23: rename TestbedInstance -> TestbedController
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import proxy, validation
6 from nepi.util.constants import STATUS_FINISHED, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11 import ConfigParser
12 import os
13
14 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
15 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
16 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
17
18 class ConnectorType(object):
19     def __init__(self, testbed_id, factory_id, name, max = -1, min = 0):
20         super(ConnectorType, self).__init__()
21         if max == -1:
22             max = sys.maxint
23         elif max <= 0:
24                 raise RuntimeError(
25              "The maximum number of connections allowed need to be more than 0")
26         if min < 0:
27             raise RuntimeError(
28              "The minimum number of connections allowed needs to be at least 0")
29         # connector_type_id -- univoquely identifies a connector type 
30         # across testbeds
31         self._connector_type_id = (testbed_id.lower(), factory_id.lower(), 
32                 name.lower())
33         # name -- display name for the connector type
34         self._name = name
35         # max -- maximum amount of connections that this type support, 
36         # -1 for no limit
37         self._max = max
38         # min -- minimum amount of connections required by this type of connector
39         self._min = min
40         # from_connections -- connections where the other connector is the "From"
41         # to_connections -- connections where the other connector is the "To"
42         # keys in the dictionary correspond to the 
43         # connector_type_id for possible connections. The value is a tuple:
44         # (can_cross, connect)
45         # can_cross: indicates if the connection is allowed accros different
46         #    testbed instances
47         # code: is the connection function to be invoked when the elements
48         #    are connected
49         self._from_connections = dict()
50         self._to_connections = dict()
51
52     @property
53     def connector_type_id(self):
54         return self._connector_type_id
55
56     @property
57     def name(self):
58         return self._name
59
60     @property
61     def max(self):
62         return self._max
63
64     @property
65     def min(self):
66         return self._min
67
68     def add_from_connection(self, testbed_id, factory_id, name, can_cross, code):
69         self._from_connections[(testbed_id.lower(), factory_id.lower(),
70             name.lower())] = (can_cross, code)
71
72     def add_to_connection(self, testbed_id, factory_id, name, can_cross, code):
73         self._to_connections[(testbed_id.lower(), factory_id.lower(), 
74             name.lower())] = (can_cross, code)
75
76     def can_connect(self, testbed_id, factory_id, name, count, 
77             must_cross = False):
78         connector_type_id = (testbed_id.lower(), factory_id.lower(),
79             name.lower())
80         if connector_type_id in self._from_connections:
81             (can_cross, code) = self._from_connections[connector_type_id]
82         elif connector_type_id in self._to_connections:
83             (can_cross, code) = self._to_connections[connector_type_id]
84         else:
85             return False
86         return not must_cross or can_cross
87
88     def code_to_connect(self, testbed_id, factory_id, name):
89         connector_type_id = (testbed_id.lower(), factory_id.lower(), 
90             name.lower())        
91         if not connector_type_id in self._to_connections.keys():
92             return False
93         (can_cross, code) = self._to_connections[connector_type_id]
94         return code
95
96 # TODO: create_function, start_function, stop_function, status_function 
97 # need a definition!
98 class Factory(AttributesMap):
99     def __init__(self, factory_id, create_function, start_function, 
100             stop_function, status_function, configure_function,
101             allow_addresses = False, allow_routes = False):
102         super(Factory, self).__init__()
103         self._factory_id = factory_id
104         self._allow_addresses = (allow_addresses == True)
105         self._allow_routes = (allow_routes == True)
106         self._create_function = create_function
107         self._start_function = start_function
108         self._stop_function = stop_function
109         self._status_function = status_function
110         self._configure_function = configure_function
111         self._connector_types = dict()
112         self._traces = list()
113         self._box_attributes = AttributesMap()
114
115     @property
116     def factory_id(self):
117         return self._factory_id
118
119     @property
120     def allow_addresses(self):
121         return self._allow_addresses
122
123     @property
124     def allow_routes(self):
125         return self._allow_routes
126
127     @property
128     def box_attributes(self):
129         return self._box_attributes
130
131     @property
132     def create_function(self):
133         return self._create_function
134
135     @property
136     def start_function(self):
137         return self._start_function
138
139     @property
140     def stop_function(self):
141         return self._stop_function
142
143     @property
144     def status_function(self):
145         return self._status_function
146
147     @property
148     def configure_function(self):
149         return self._configure_function
150
151     @property
152     def traces(self):
153         return self._traces
154
155     def connector_type(self, name):
156         return self._connector_types[name]
157
158     def add_connector_type(self, connector_type):
159         self._connector_types[connector_type.name] = connector_type
160
161     def add_trace(self, trace_id):
162         self._traces.append(trace_id)
163
164     def add_box_attribute(self, name, help, type, value = None, range = None,
165         allowed = None, flags = Attribute.NoFlags, validation_function = None):
166         self._box_attributes.add_attribute(name, help, type, value, range, 
167                 allowed, flags, validation_function)
168
169 class TestbedController(object):
170     def __init__(self, testbed_id, testbed_version):
171         self._testbed_id = testbed_id
172         self._testbed_version = testbed_version
173
174     @property
175     def guids(self):
176         raise NotImplementedError
177
178     def defer_configure(self, name, value):
179         """Instructs setting a configuartion attribute for the testbed instance"""
180         raise NotImplementedError
181
182     def defer_create(self, guid, factory_id):
183         """Instructs creation of element """
184         raise NotImplementedError
185
186     def defer_create_set(self, guid, name, value):
187         """Instructs setting an initial attribute on an element"""
188         raise NotImplementedError
189
190     def defer_factory_set(self, guid, name, value):
191         """Instructs setting an attribute on a factory"""
192         raise NotImplementedError
193
194     def defer_connect(self, guid1, connector_type_name1, guid2, 
195             connector_type_name2): 
196         """Instructs creation of a connection between the given connectors"""
197         raise NotImplementedError
198
199     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
200             cross_testbed_id, cross_factory_id, cross_connector_type_name):
201         """
202         Instructs creation of a connection between the given connectors 
203         of different testbed instances
204         """
205         raise NotImplementedError
206
207     def defer_add_trace(self, guid, trace_id):
208         """Instructs the addition of a trace"""
209         raise NotImplementedError
210
211     def defer_add_address(self, guid, address, netprefix, broadcast): 
212         """Instructs the addition of an address"""
213         raise NotImplementedError
214
215     def defer_add_route(self, guid, destination, netprefix, nexthop):
216         """Instructs the addition of a route"""
217         raise NotImplementedError
218
219     def do_setup(self):
220         """After do_setup the testbed initial configuration is done"""
221         raise NotImplementedError
222
223     def do_create(self):
224         """
225         After do_create all instructed elements are created and 
226         attributes setted
227         """
228         raise NotImplementedError
229
230     def do_connect(self):
231         """
232         After do_connect all internal connections between testbed elements
233         are done
234         """
235         raise NotImplementedError
236
237     def do_configure(self):
238         """After do_configure elements are configured"""
239         raise NotImplementedError
240
241     def do_cross_connect(self):
242         """
243         After do_cross_connect all external connections between different testbed 
244         elements are done
245         """
246         raise NotImplementedError
247
248     def start(self):
249         raise NotImplementedError
250
251     def stop(self):
252         raise NotImplementedError
253
254     def set(self, time, guid, name, value):
255         raise NotImplementedError
256
257     def get(self, time, guid, name):
258         raise NotImplementedError
259     
260     def get_route(self, guid, index, attribute):
261         """
262         Params:
263             
264             guid: guid of box to query
265             index: number of routing entry to fetch
266             attribute: one of Destination, NextHop, NetPrefix
267         """
268         raise NotImplementedError
269
270     def get_address(self, guid, index, attribute='Address'):
271         """
272         Params:
273             
274             guid: guid of box to query
275             index: number of inteface to select
276             attribute: one of Address, NetPrefix, Broadcast
277         """
278         raise NotImplementedError
279
280     def action(self, time, guid, action):
281         raise NotImplementedError
282
283     def status(self, guid):
284         raise NotImplementedError
285
286     def trace(self, guid, trace_id, attribute='value'):
287         raise NotImplementedError
288
289     def shutdown(self):
290         raise NotImplementedError
291
292 class ExperimentController(object):
293     def __init__(self, experiment_xml, root_dir):
294         self._experiment_xml = experiment_xml
295         self._testbeds = dict()
296         self._access_config = dict()
297         self._netrefs = dict()
298         self._root_dir = root_dir
299
300         self.persist_experiment_xml()
301
302     @property
303     def experiment_xml(self):
304         return self._experiment_xml
305
306     def persist_experiment_xml(self):
307         xml_path = os.path.join(self._root_dir, "experiment.xml")
308         f = open(xml_path, "w")
309         f.write(self._experiment_xml)
310         f.close()
311
312     def set_access_configuration(self, testbed_guid, access_config):
313         self._access_config[testbed_guid] = access_config
314
315     def trace(self, testbed_guid, guid, trace_id, attribute='value'):
316         return self._testbeds[testbed_guid].trace(guid, trace_id, attribute)
317
318     @staticmethod
319     def _parallel(callables):
320         threads = [ threading.Thread(target=callable) for callable in callables ]
321         for thread in threads:
322             thread.start()
323         for thread in threads:
324             thread.join()
325
326     def start(self):
327         self._create_testbed_instances()
328         
329         # persist testbed connection data, for potential recovery
330         self._persist_testbed_proxies()
331         
332         # perform setup in parallel for all test beds,
333         # wait for all threads to finish
334         self._parallel([testbed.do_setup 
335                         for testbed in self._testbeds.itervalues()])
336         
337         # perform create-connect in parallel, wait
338         # (internal connections only)
339         self._parallel([lambda : (testbed.do_create(), 
340                                   testbed.do_connect())
341                         for testbed in self._testbeds.itervalues()])
342         
343         # resolve netrefs
344         self.do_netrefs(fail_if_undefined=True)
345         
346         # perform do_configure in parallel for al testbeds
347         # (it's internal configuration for each)
348         self._parallel([testbed.do_configure
349                         for testbed in self._testbeds.itervalues()])
350
351         # cross-connect (cannot be done in parallel)
352         for testbed in self._testbeds.values():
353             testbed.do_cross_connect()
354         
355         # start experiment (parallel start on all testbeds)
356         self._parallel([testbed.start
357                         for testbed in self._testbeds.itervalues()])
358
359     def _persist_testbed_proxies(self):
360         TRANSIENT = ('Recover',)
361         
362         # persist access configuration for all testbeds, so that
363         # recovery mode can reconnect to them if it becomes necessary
364         conf = ConfigParser.RawConfigParser()
365         for testbed_guid, testbed_config in self._access_config.iteritems():
366             testbed_guid = str(testbed_guid)
367             conf.add_section(testbed_guid)
368             for attr in testbed_config.attributes_name:
369                 if attr not in TRANSIENT:
370                     conf.set(testbed_guid, attr, 
371                         testbed_config.get_attribute_value(attr))
372         
373         f = open(os.path.join(self._root_dir, 'access_config.ini'), 'w')
374         conf.write(f)
375         f.close()
376     
377     def _load_testbed_proxies(self):
378         TYPEMAP = {
379             STRING : 'get',
380             INTEGER : 'getint',
381             FLOAT : 'getfloat',
382             BOOLEAN : 'getboolean',
383         }
384         
385         conf = ConfigParser.RawConfigParser()
386         conf.read(os.path.join(self._root_dir, 'access_config.ini'))
387         for testbed_guid in conf.sections():
388             testbed_config = proxy.AccessConfiguration()
389             for attr in conf.options(testbed_guid):
390                 testbed_config.set_attribute_value(attr, 
391                     conf.get(testbed_guid, attr) )
392                 
393             testbed_guid = str(testbed_guid)
394             conf.add_section(testbed_guid)
395             for attr in testbed_config.attributes_name:
396                 if attr not in TRANSIENT:
397                     getter = getattr(conf, TYPEMAP.get(
398                         testbed_config.get_attribute_type(attr),
399                         'get') )
400                     testbed_config.set_attribute_value(
401                         testbed_guid, attr, getter(attr))
402     
403     def _unpersist_testbed_proxies(self):
404         try:
405             os.remove(os.path.join(self._root_dir, 'access_config.ini'))
406         except:
407             # Just print exceptions, this is just cleanup
408             import traceback
409             traceback.print_exc(file=sys.stderr)
410
411     def stop(self):
412        for testbed in self._testbeds.values():
413            testbed.stop()
414        self._unpersist_testbed_proxies()
415    
416     def recover(self):
417         # reload perviously persisted testbed access configurations
418         self._load_testbed_proxies()
419         
420         # recreate testbed proxies by reconnecting only
421         self._create_testbed_instances(recover=True)
422
423     def is_finished(self, guid):
424         for testbed in self._testbeds.values():
425             for guid_ in testbed.guids:
426                 if guid_ == guid:
427                     return testbed.status(guid) == STATUS_FINISHED
428         raise RuntimeError("No element exists with guid %d" % guid)    
429
430     def shutdown(self):
431        for testbed in self._testbeds.values():
432            testbed.shutdown()
433
434     @staticmethod
435     def _netref_component_split(component):
436         match = COMPONENT_PATTERN.match(component)
437         if match:
438             return match.group("kind"), match.group("index")
439         else:
440             return component, None
441
442     def do_netrefs(self, fail_if_undefined = False):
443         COMPONENT_GETTERS = {
444             'addr' :
445                 lambda testbed, guid, index, name : 
446                     testbed.get_address(guid, index, name),
447             'route' :
448                 lambda testbed, guid, index, name : 
449                     testbed.get_route(guid, index, name),
450             'trace' :
451                 lambda testbed, guid, index, name : 
452                     testbed.trace(guid, index, name),
453             '' : 
454                 lambda testbed, guid, index, name : 
455                     testbed.get(TIME_NOW, guid, name),
456         }
457         
458         for (testbed_guid, guid), attrs in self._netrefs.iteritems():
459             testbed = self._testbeds[testbed_guid]
460             for name in attrs:
461                 value = testbed.get(TIME_NOW, guid, name)
462                 if isinstance(value, basestring):
463                     match = ATTRIBUTE_PATTERN_BASE.search(value)
464                     if match:
465                         label = match.group("label")
466                         if label.startswith('GUID-'):
467                             ref_guid = int(label[5:])
468                             if ref_guid:
469                                 expr = match.group("expr")
470                                 component = match.group("component")[1:] # skip the dot
471                                 attribute = match.group("attribute")
472                                 
473                                 # split compound components into component kind and index
474                                 # eg: 'addr[0]' -> ('addr', '0')
475                                 component, component_index = self._netref_component_split(component)
476                                 
477                                 # find object and resolve expression
478                                 for ref_testbed in self._testbeds.itervalues():
479                                     if component not in COMPONENT_GETTERS:
480                                         raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
481                                     else:
482                                         value = COMPONENT_GETTERS[component](
483                                             ref_testbed, ref_guid, component_index, attribute)
484                                         if value: 
485                                             break
486                                 else:
487                                     # couldn't find value
488                                     if fail_if_undefined:
489                                         raise ValueError, "Unresolvable GUID: %r, in netref: %r" % (ref_guid, expr)
490
491     def _create_testbed_instances(self, recover = False):
492         parser = XmlExperimentParser()
493         data = parser.from_xml_to_data(self._experiment_xml)
494         element_guids = list()
495         label_guids = dict()
496         data_guids = data.guids
497         netrefs = self._netrefs
498         for guid in data_guids:
499             if data.is_testbed_data(guid):
500                 (testbed_id, testbed_version) = data.get_testbed_data(guid)
501                 access_config = None if guid not in self._access_config else\
502                         self._access_config[guid]
503                 
504                 if recover and access_config is None:
505                     # need to create one
506                     access_config = self._access_config[guid] = proxy.AccessConfiguration()
507                 if access_config is not None:
508                     # force recovery mode 
509                     access_config.set_attribute_value("recover",recover)
510                 
511                 testbed = proxy.create_testbed_instance(testbed_id, 
512                         testbed_version, access_config)
513                 for (name, value) in data.get_attribute_data(guid):
514                     testbed.defer_configure(name, value)
515                 self._testbeds[guid] = testbed
516             else:
517                 element_guids.append(guid)
518                 label = data.get_attribute_data(guid, "label")
519                 if label is not None:
520                     if label in label_guids:
521                         raise RuntimeError, "Label %r is not unique" % (label,)
522                     label_guids[label] = guid
523         for guid in data_guids:
524             if not data.is_testbed_data(guid):
525                 for name, value in data.get_attribute_data(guid):
526                     if isinstance(value, basestring):
527                         match = ATTRIBUTE_PATTERN_BASE.search(value)
528                         if match:
529                             label = match.group("label")
530                             if not label.startswith('GUID-'):
531                                 ref_guid = label_guids.get(label)
532                                 if ref_guid is not None:
533                                     value = ATTRIBUTE_PATTERN_BASE.sub(
534                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
535                                             guid='GUID-%d' % (ref_guid,),
536                                             expr=match.group("expr"),
537                                             label=label), 
538                                         value)
539                                     data.set_attribute_data(guid, name, value)
540                                     
541                                     # memorize which guid-attribute pairs require
542                                     # postprocessing, to avoid excessive controller-testbed
543                                     # communication at configuration time
544                                     # (which could require high-latency network I/O)
545                                     (testbed_guid, factory_id) = data.get_box_data(guid)
546                                     netrefs.setdefault((testbed_guid,guid),set()).add(name)
547         if not recover:
548             self._program_testbed_instances(element_guids, data)
549
550     def _program_testbed_instances(self, element_guids, data):
551         for guid in element_guids:
552             (testbed_guid, factory_id) = data.get_box_data(guid)
553             testbed = self._testbeds[testbed_guid]
554             testbed.defer_create(guid, factory_id)
555             for (name, value) in data.get_attribute_data(guid):
556                 testbed.defer_create_set(guid, name, value)
557
558         for guid in element_guids: 
559             (testbed_guid, factory_id) = data.get_box_data(guid)
560             testbed = self._testbeds[testbed_guid]
561             for (connector_type_name, other_guid, other_connector_type_name) \
562                     in data.get_connection_data(guid):
563                 (testbed_guid, factory_id) = data.get_box_data(guid)
564                 (other_testbed_guid, other_factory_id) = data.get_box_data(
565                         other_guid)
566                 if testbed_guid == other_testbed_guid:
567                     testbed.defer_connect(guid, connector_type_name, other_guid, 
568                         other_connector_type_name)
569                 else:
570                     testbed.defer_cross_connect(guid, connector_type_name, other_guid, 
571                         other_testbed_id, other_factory_id, other_connector_type_name)
572             for trace_id in data.get_trace_data(guid):
573                 testbed.defer_add_trace(guid, trace_id)
574             for (autoconf, address, netprefix, broadcast) in \
575                     data.get_address_data(guid):
576                 if address != None:
577                     testbed.defer_add_address(guid, address, netprefix, broadcast)
578             for (destination, netprefix, nexthop) in data.get_route_data(guid):
579                 testbed.defer_add_route(guid, destination, netprefix, nexthop)
580