Ticket #10: netrefs, initial implementation
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import proxy, validation
6 from nepi.util.constants import STATUS_FINISHED, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10
11 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
12 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
13 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
14
15 class ConnectorType(object):
16     def __init__(self, testbed_id, factory_id, name, max = -1, min = 0):
17         super(ConnectorType, self).__init__()
18         if max == -1:
19             max = sys.maxint
20         elif max <= 0:
21                 raise RuntimeError(
22              "The maximum number of connections allowed need to be more than 0")
23         if min < 0:
24             raise RuntimeError(
25              "The minimum number of connections allowed needs to be at least 0")
26         # connector_type_id -- univoquely identifies a connector type 
27         # across testbeds
28         self._connector_type_id = (testbed_id.lower(), factory_id.lower(), 
29                 name.lower())
30         # name -- display name for the connector type
31         self._name = name
32         # max -- maximum amount of connections that this type support, 
33         # -1 for no limit
34         self._max = max
35         # min -- minimum amount of connections required by this type of connector
36         self._min = min
37         # from_connections -- connections where the other connector is the "From"
38         # to_connections -- connections where the other connector is the "To"
39         # keys in the dictionary correspond to the 
40         # connector_type_id for possible connections. The value is a tuple:
41         # (can_cross, connect)
42         # can_cross: indicates if the connection is allowed accros different
43         #    testbed instances
44         # code: is the connection function to be invoked when the elements
45         #    are connected
46         self._from_connections = dict()
47         self._to_connections = dict()
48
49     @property
50     def connector_type_id(self):
51         return self._connector_type_id
52
53     @property
54     def name(self):
55         return self._name
56
57     @property
58     def max(self):
59         return self._max
60
61     @property
62     def min(self):
63         return self._min
64
65     def add_from_connection(self, testbed_id, factory_id, name, can_cross, code):
66         self._from_connections[(testbed_id.lower(), factory_id.lower(),
67             name.lower())] = (can_cross, code)
68
69     def add_to_connection(self, testbed_id, factory_id, name, can_cross, code):
70         self._to_connections[(testbed_id.lower(), factory_id.lower(), 
71             name.lower())] = (can_cross, code)
72
73     def can_connect(self, testbed_id, factory_id, name, count, 
74             must_cross = False):
75         connector_type_id = (testbed_id.lower(), factory_id.lower(),
76             name.lower())
77         if connector_type_id in self._from_connections:
78             (can_cross, code) = self._from_connections[connector_type_id]
79         elif connector_type_id in self._to_connections:
80             (can_cross, code) = self._to_connections[connector_type_id]
81         else:
82             return False
83         return not must_cross or can_cross
84
85     def code_to_connect(self, testbed_id, factory_id, name):
86         connector_type_id = (testbed_id.lower(), factory_id.lower(), 
87             name.lower())        
88         if not connector_type_id in self._to_connections.keys():
89             return False
90         (can_cross, code) = self._to_connections[connector_type_id]
91         return code
92
93 # TODO: create_function, start_function, stop_function, status_function 
94 # need a definition!
95 class Factory(AttributesMap):
96     def __init__(self, factory_id, create_function, start_function, 
97             stop_function, status_function, configure_function,
98             allow_addresses = False, allow_routes = False):
99         super(Factory, self).__init__()
100         self._factory_id = factory_id
101         self._allow_addresses = (allow_addresses == True)
102         self._allow_routes = (allow_routes == True)
103         self._create_function = create_function
104         self._start_function = start_function
105         self._stop_function = stop_function
106         self._status_function = status_function
107         self._configure_function = configure_function
108         self._connector_types = dict()
109         self._traces = list()
110         self._box_attributes = AttributesMap()
111
112     @property
113     def factory_id(self):
114         return self._factory_id
115
116     @property
117     def allow_addresses(self):
118         return self._allow_addresses
119
120     @property
121     def allow_routes(self):
122         return self._allow_routes
123
124     @property
125     def box_attributes(self):
126         return self._box_attributes
127
128     @property
129     def create_function(self):
130         return self._create_function
131
132     @property
133     def start_function(self):
134         return self._start_function
135
136     @property
137     def stop_function(self):
138         return self._stop_function
139
140     @property
141     def status_function(self):
142         return self._status_function
143
144     @property
145     def configure_function(self):
146         return self._configure_function
147
148     @property
149     def traces(self):
150         return self._traces
151
152     def connector_type(self, name):
153         return self._connector_types[name]
154
155     def add_connector_type(self, connector_type):
156         self._connector_types[connector_type.name] = connector_type
157
158     def add_trace(self, trace_id):
159         self._traces.append(trace_id)
160
161     def add_box_attribute(self, name, help, type, value = None, range = None,
162         allowed = None, flags = Attribute.NoFlags, validation_function = None):
163         self._box_attributes.add_attribute(name, help, type, value, range, 
164                 allowed, flags, validation_function)
165
166 class TestbedInstance(object):
167     def __init__(self, testbed_id, testbed_version):
168         self._testbed_id = testbed_id
169         self._testbed_version = testbed_version
170
171     @property
172     def guids(self):
173         raise NotImplementedError
174
175     def defer_configure(self, name, value):
176         """Instructs setting a configuartion attribute for the testbed instance"""
177         raise NotImplementedError
178
179     def defer_create(self, guid, factory_id):
180         """Instructs creation of element """
181         raise NotImplementedError
182
183     def defer_create_set(self, guid, name, value):
184         """Instructs setting an initial attribute on an element"""
185         raise NotImplementedError
186
187     def defer_factory_set(self, guid, name, value):
188         """Instructs setting an attribute on a factory"""
189         raise NotImplementedError
190
191     def defer_connect(self, guid1, connector_type_name1, guid2, 
192             connector_type_name2): 
193         """Instructs creation of a connection between the given connectors"""
194         raise NotImplementedError
195
196     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
197             cross_testbed_id, cross_factory_id, cross_connector_type_name):
198         """
199         Instructs creation of a connection between the given connectors 
200         of different testbed instances
201         """
202         raise NotImplementedError
203
204     def defer_add_trace(self, guid, trace_id):
205         """Instructs the addition of a trace"""
206         raise NotImplementedError
207
208     def defer_add_address(self, guid, address, netprefix, broadcast): 
209         """Instructs the addition of an address"""
210         raise NotImplementedError
211
212     def defer_add_route(self, guid, destination, netprefix, nexthop):
213         """Instructs the addition of a route"""
214         raise NotImplementedError
215
216     def do_setup(self):
217         """After do_setup the testbed initial configuration is done"""
218         raise NotImplementedError
219
220     def do_create(self):
221         """
222         After do_create all instructed elements are created and 
223         attributes setted
224         """
225         raise NotImplementedError
226
227     def do_connect(self):
228         """
229         After do_connect all internal connections between testbed elements
230         are done
231         """
232         raise NotImplementedError
233
234     def do_configure(self):
235         """After do_configure elements are configured"""
236         raise NotImplementedError
237
238     def do_cross_connect(self):
239         """
240         After do_cross_connect all external connections between different testbed 
241         elements are done
242         """
243         raise NotImplementedError
244
245     def start(self):
246         raise NotImplementedError
247
248     def stop(self):
249         raise NotImplementedError
250
251     def set(self, time, guid, name, value):
252         raise NotImplementedError
253
254     def get(self, time, guid, name):
255         raise NotImplementedError
256     
257     def get_route(self, guid, index, attribute):
258         """
259         Params:
260             
261             guid: guid of box to query
262             index: number of routing entry to fetch
263             attribute: one of Destination, NextHop, NetPrefix
264         """
265         raise NotImplementedError
266
267     def get_address(self, guid, index, attribute='Address'):
268         """
269         Params:
270             
271             guid: guid of box to query
272             index: number of inteface to select
273             attribute: one of Address, NetPrefix, Broadcast
274         """
275         raise NotImplementedError
276
277     def action(self, time, guid, action):
278         raise NotImplementedError
279
280     def status(self, guid):
281         raise NotImplementedError
282
283     def trace(self, guid, trace_id, attribute='value'):
284         raise NotImplementedError
285
286     def shutdown(self):
287         raise NotImplementedError
288
289 class ExperimentController(object):
290     def __init__(self, experiment_xml):
291         self._experiment_xml = experiment_xml
292         self._testbeds = dict()
293         self._access_config = dict()
294         self._netrefs = dict()
295
296     @property
297     def experiment_xml(self):
298         return self._experiment_xml
299
300     def set_access_configuration(self, testbed_guid, access_config):
301         self._access_config[testbed_guid] = access_config
302
303     def trace(self, testbed_guid, guid, trace_id):
304         return self._testbeds[testbed_guid].trace(guid, trace_id)
305
306     def start(self):
307         self._create_testbed_instances()
308         for testbed in self._testbeds.values():
309             testbed.do_setup()
310         for testbed in self._testbeds.values():
311             testbed.do_create()
312             testbed.do_connect()
313         self.do_netrefs(fail_if_undefined=True)
314         for testbed in self._testbeds.values():
315             testbed.do_configure()
316         for testbed in self._testbeds.values():
317             testbed.do_cross_connect()
318         for testbed in self._testbeds.values():
319             testbed.start()
320
321     def stop(self):
322        for testbed in self._testbeds.values():
323            testbed.stop()
324
325     def is_finished(self, guid):
326         for testbed in self._testbeds.values():
327             for guid_ in testbed.guids:
328                 if guid_ == guid:
329                     return testbed.status(guid) == STATUS_FINISHED
330         raise RuntimeError("No element exists with guid %d" % guid)    
331
332     def shutdown(self):
333        for testbed in self._testbeds.values():
334            testbed.shutdown()
335
336     @staticmethod
337     def _netref_component_split(component):
338         match = COMPONENT_PATTERN.match(component)
339         if match:
340             return match.group("kind"), match.group("index")
341         else:
342             return component, None
343
344     def do_netrefs(self, fail_if_undefined = False):
345         COMPONENT_GETTERS = {
346             'addr' :
347                 lambda testbed, guid, index, name : 
348                     testbed.get_address(guid, index, name),
349             'route' :
350                 lambda testbed, guid, index, name : 
351                     testbed.get_route(guid, index, name),
352             'trace' :
353                 lambda testbed, guid, index, name : 
354                     testbed.trace(guid, index, name),
355             '' : 
356                 lambda testbed, guid, index, name : 
357                     testbed.get(TIME_NOW, guid, name),
358         }
359         
360         for (testbed_guid, guid), attrs in self._netrefs.iteritems():
361             testbed = self._testbeds[testbed_guid]
362             for name in attrs:
363                 value = testbed.get(TIME_NOW, guid, name)
364                 if isinstance(value, basestring):
365                     match = ATTRIBUTE_PATTERN_BASE.search(value)
366                     if match:
367                         label = match.group("label")
368                         if label.startswith('GUID-'):
369                             ref_guid = int(label[5:])
370                             if ref_guid:
371                                 expr = match.group("expr")
372                                 component = match.group("component")[1:] # skip the dot
373                                 attribute = match.group("attribute")
374                                 
375                                 # split compound components into component kind and index
376                                 # eg: 'addr[0]' -> ('addr', '0')
377                                 component, component_index = self._netref_component_split(component)
378                                 
379                                 # find object and resolve expression
380                                 for ref_testbed in self._testbeds.itervalues():
381                                     if component not in COMPONENT_GETTERS:
382                                         raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
383                                     else:
384                                         value = COMPONENT_GETTERS[component](
385                                             ref_testbed, ref_guid, component_index, attribute)
386                                         if value: 
387                                             break
388                                 else:
389                                     # couldn't find value
390                                     if fail_if_undefined:
391                                         raise ValueError, "Unresolvable GUID: %r, in netref: %r" % (ref_guid, expr)
392
393     def _create_testbed_instances(self):
394         parser = XmlExperimentParser()
395         data = parser.from_xml_to_data(self._experiment_xml)
396         element_guids = list()
397         label_guids = dict()
398         data_guids = data.guids
399         netrefs = self._netrefs
400         for guid in data_guids:
401             if data.is_testbed_data(guid):
402                 (testbed_id, testbed_version) = data.get_testbed_data(guid)
403                 access_config = None if guid not in self._access_config else\
404                         self._access_config[guid]
405                 testbed = proxy.create_testbed_instance(testbed_id, 
406                         testbed_version, access_config)
407                 for (name, value) in data.get_attribute_data(guid):
408                     testbed.defer_configure(name, value)
409                 self._testbeds[guid] = testbed
410             else:
411                 element_guids.append(guid)
412                 label = data.get_attribute_data(guid, "label")
413                 if label is not None:
414                     if label in label_guids:
415                         raise RuntimeError, "Label %r is not unique" % (label,)
416                     label_guids[label] = guid
417         for guid in data_guids:
418             if not data.is_testbed_data(guid):
419                 for name, value in data.get_attribute_data(guid):
420                     if isinstance(value, basestring):
421                         match = ATTRIBUTE_PATTERN_BASE.search(value)
422                         if match:
423                             label = match.group("label")
424                             if not label.startswith('GUID-'):
425                                 ref_guid = label_guids.get(label)
426                                 if ref_guid is not None:
427                                     value = ATTRIBUTE_PATTERN_BASE.sub(
428                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
429                                             guid='GUID-%d' % (ref_guid,),
430                                             expr=match.group("expr"),
431                                             label=label), 
432                                         value)
433                                     data.set_attribute_data(guid, name, value)
434                                     
435                                     # memorize which guid-attribute pairs require
436                                     # postprocessing, to avoid excessive controller-testbed
437                                     # communication at configuration time
438                                     # (which could require high-latency network I/O)
439                                     (testbed_guid, factory_id) = data.get_box_data(guid)
440                                     netrefs.setdefault((testbed_guid,guid),set()).add(name)
441         self._program_testbed_instances(element_guids, data)
442
443     def _program_testbed_instances(self, element_guids, data):
444         for guid in element_guids:
445             (testbed_guid, factory_id) = data.get_box_data(guid)
446             testbed = self._testbeds[testbed_guid]
447             testbed.defer_create(guid, factory_id)
448             for (name, value) in data.get_attribute_data(guid):
449                 testbed.defer_create_set(guid, name, value)
450
451         for guid in element_guids: 
452             (testbed_guid, factory_id) = data.get_box_data(guid)
453             testbed = self._testbeds[testbed_guid]
454             for (connector_type_name, other_guid, other_connector_type_name) \
455                     in data.get_connection_data(guid):
456                 (testbed_guid, factory_id) = data.get_box_data(guid)
457                 (other_testbed_guid, other_factory_id) = data.get_box_data(
458                         other_guid)
459                 if testbed_guid == other_testbed_guid:
460                     testbed.defer_connect(guid, connector_type_name, other_guid, 
461                         other_connector_type_name)
462                 else:
463                     testbed.defer_cross_connect(guid, connector_type_name, other_guid, 
464                         other_testbed_id, other_factory_id, other_connector_type_name)
465             for trace_id in data.get_trace_data(guid):
466                 testbed.defer_add_trace(guid, trace_id)
467             for (autoconf, address, netprefix, broadcast) in \
468                     data.get_address_data(guid):
469                 if address != None:
470                     testbed.defer_add_address(guid, address, netprefix, broadcast)
471             for (destination, netprefix, nexthop) in data.get_route_data(guid):
472                 testbed.defer_add_route(guid, destination, netprefix, nexthop)
473