Ticket #11: parallel execution
[nepi.git] / src / nepi / core / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core.attributes import Attribute, AttributesMap
5 from nepi.util import proxy, validation
6 from nepi.util.constants import STATUS_FINISHED, TIME_NOW
7 from nepi.util.parser._xml import XmlExperimentParser
8 import sys
9 import re
10 import threading
11
12 ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
13 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
14 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
15
16 class ConnectorType(object):
17     def __init__(self, testbed_id, factory_id, name, max = -1, min = 0):
18         super(ConnectorType, self).__init__()
19         if max == -1:
20             max = sys.maxint
21         elif max <= 0:
22                 raise RuntimeError(
23              "The maximum number of connections allowed need to be more than 0")
24         if min < 0:
25             raise RuntimeError(
26              "The minimum number of connections allowed needs to be at least 0")
27         # connector_type_id -- univoquely identifies a connector type 
28         # across testbeds
29         self._connector_type_id = (testbed_id.lower(), factory_id.lower(), 
30                 name.lower())
31         # name -- display name for the connector type
32         self._name = name
33         # max -- maximum amount of connections that this type support, 
34         # -1 for no limit
35         self._max = max
36         # min -- minimum amount of connections required by this type of connector
37         self._min = min
38         # from_connections -- connections where the other connector is the "From"
39         # to_connections -- connections where the other connector is the "To"
40         # keys in the dictionary correspond to the 
41         # connector_type_id for possible connections. The value is a tuple:
42         # (can_cross, connect)
43         # can_cross: indicates if the connection is allowed accros different
44         #    testbed instances
45         # code: is the connection function to be invoked when the elements
46         #    are connected
47         self._from_connections = dict()
48         self._to_connections = dict()
49
50     @property
51     def connector_type_id(self):
52         return self._connector_type_id
53
54     @property
55     def name(self):
56         return self._name
57
58     @property
59     def max(self):
60         return self._max
61
62     @property
63     def min(self):
64         return self._min
65
66     def add_from_connection(self, testbed_id, factory_id, name, can_cross, code):
67         self._from_connections[(testbed_id.lower(), factory_id.lower(),
68             name.lower())] = (can_cross, code)
69
70     def add_to_connection(self, testbed_id, factory_id, name, can_cross, code):
71         self._to_connections[(testbed_id.lower(), factory_id.lower(), 
72             name.lower())] = (can_cross, code)
73
74     def can_connect(self, testbed_id, factory_id, name, count, 
75             must_cross = False):
76         connector_type_id = (testbed_id.lower(), factory_id.lower(),
77             name.lower())
78         if connector_type_id in self._from_connections:
79             (can_cross, code) = self._from_connections[connector_type_id]
80         elif connector_type_id in self._to_connections:
81             (can_cross, code) = self._to_connections[connector_type_id]
82         else:
83             return False
84         return not must_cross or can_cross
85
86     def code_to_connect(self, testbed_id, factory_id, name):
87         connector_type_id = (testbed_id.lower(), factory_id.lower(), 
88             name.lower())        
89         if not connector_type_id in self._to_connections.keys():
90             return False
91         (can_cross, code) = self._to_connections[connector_type_id]
92         return code
93
94 # TODO: create_function, start_function, stop_function, status_function 
95 # need a definition!
96 class Factory(AttributesMap):
97     def __init__(self, factory_id, create_function, start_function, 
98             stop_function, status_function, configure_function,
99             allow_addresses = False, allow_routes = False):
100         super(Factory, self).__init__()
101         self._factory_id = factory_id
102         self._allow_addresses = (allow_addresses == True)
103         self._allow_routes = (allow_routes == True)
104         self._create_function = create_function
105         self._start_function = start_function
106         self._stop_function = stop_function
107         self._status_function = status_function
108         self._configure_function = configure_function
109         self._connector_types = dict()
110         self._traces = list()
111         self._box_attributes = AttributesMap()
112
113     @property
114     def factory_id(self):
115         return self._factory_id
116
117     @property
118     def allow_addresses(self):
119         return self._allow_addresses
120
121     @property
122     def allow_routes(self):
123         return self._allow_routes
124
125     @property
126     def box_attributes(self):
127         return self._box_attributes
128
129     @property
130     def create_function(self):
131         return self._create_function
132
133     @property
134     def start_function(self):
135         return self._start_function
136
137     @property
138     def stop_function(self):
139         return self._stop_function
140
141     @property
142     def status_function(self):
143         return self._status_function
144
145     @property
146     def configure_function(self):
147         return self._configure_function
148
149     @property
150     def traces(self):
151         return self._traces
152
153     def connector_type(self, name):
154         return self._connector_types[name]
155
156     def add_connector_type(self, connector_type):
157         self._connector_types[connector_type.name] = connector_type
158
159     def add_trace(self, trace_id):
160         self._traces.append(trace_id)
161
162     def add_box_attribute(self, name, help, type, value = None, range = None,
163         allowed = None, flags = Attribute.NoFlags, validation_function = None):
164         self._box_attributes.add_attribute(name, help, type, value, range, 
165                 allowed, flags, validation_function)
166
167 class TestbedInstance(object):
168     def __init__(self, testbed_id, testbed_version):
169         self._testbed_id = testbed_id
170         self._testbed_version = testbed_version
171
172     @property
173     def guids(self):
174         raise NotImplementedError
175
176     def defer_configure(self, name, value):
177         """Instructs setting a configuartion attribute for the testbed instance"""
178         raise NotImplementedError
179
180     def defer_create(self, guid, factory_id):
181         """Instructs creation of element """
182         raise NotImplementedError
183
184     def defer_create_set(self, guid, name, value):
185         """Instructs setting an initial attribute on an element"""
186         raise NotImplementedError
187
188     def defer_factory_set(self, guid, name, value):
189         """Instructs setting an attribute on a factory"""
190         raise NotImplementedError
191
192     def defer_connect(self, guid1, connector_type_name1, guid2, 
193             connector_type_name2): 
194         """Instructs creation of a connection between the given connectors"""
195         raise NotImplementedError
196
197     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
198             cross_testbed_id, cross_factory_id, cross_connector_type_name):
199         """
200         Instructs creation of a connection between the given connectors 
201         of different testbed instances
202         """
203         raise NotImplementedError
204
205     def defer_add_trace(self, guid, trace_id):
206         """Instructs the addition of a trace"""
207         raise NotImplementedError
208
209     def defer_add_address(self, guid, address, netprefix, broadcast): 
210         """Instructs the addition of an address"""
211         raise NotImplementedError
212
213     def defer_add_route(self, guid, destination, netprefix, nexthop):
214         """Instructs the addition of a route"""
215         raise NotImplementedError
216
217     def do_setup(self):
218         """After do_setup the testbed initial configuration is done"""
219         raise NotImplementedError
220
221     def do_create(self):
222         """
223         After do_create all instructed elements are created and 
224         attributes setted
225         """
226         raise NotImplementedError
227
228     def do_connect(self):
229         """
230         After do_connect all internal connections between testbed elements
231         are done
232         """
233         raise NotImplementedError
234
235     def do_configure(self):
236         """After do_configure elements are configured"""
237         raise NotImplementedError
238
239     def do_cross_connect(self):
240         """
241         After do_cross_connect all external connections between different testbed 
242         elements are done
243         """
244         raise NotImplementedError
245
246     def start(self):
247         raise NotImplementedError
248
249     def stop(self):
250         raise NotImplementedError
251
252     def set(self, time, guid, name, value):
253         raise NotImplementedError
254
255     def get(self, time, guid, name):
256         raise NotImplementedError
257     
258     def get_route(self, guid, index, attribute):
259         """
260         Params:
261             
262             guid: guid of box to query
263             index: number of routing entry to fetch
264             attribute: one of Destination, NextHop, NetPrefix
265         """
266         raise NotImplementedError
267
268     def get_address(self, guid, index, attribute='Address'):
269         """
270         Params:
271             
272             guid: guid of box to query
273             index: number of inteface to select
274             attribute: one of Address, NetPrefix, Broadcast
275         """
276         raise NotImplementedError
277
278     def action(self, time, guid, action):
279         raise NotImplementedError
280
281     def status(self, guid):
282         raise NotImplementedError
283
284     def trace(self, guid, trace_id, attribute='value'):
285         raise NotImplementedError
286
287     def shutdown(self):
288         raise NotImplementedError
289
290 class ExperimentController(object):
291     def __init__(self, experiment_xml):
292         self._experiment_xml = experiment_xml
293         self._testbeds = dict()
294         self._access_config = dict()
295         self._netrefs = dict()
296
297     @property
298     def experiment_xml(self):
299         return self._experiment_xml
300
301     def set_access_configuration(self, testbed_guid, access_config):
302         self._access_config[testbed_guid] = access_config
303
304     def trace(self, testbed_guid, guid, trace_id):
305         return self._testbeds[testbed_guid].trace(guid, trace_id)
306
307     @staticmethod
308     def _parallel(callables):
309         threads = [ threading.Thread(target=callable) for callable in callables ]
310         for thread in threads:
311             thread.start()
312         for thread in threads:
313             thread.join()
314
315     def start(self):
316         self._create_testbed_instances()
317         
318         # perform setup in parallel for all test beds,
319         # wait for all threads to finish
320         self._parallel([testbed.do_setup 
321                         for testbed in self._testbeds.itervalues()])
322         
323         # perform create-connect in parallel, wait
324         # (internal connections only)
325         self._parallel([lambda : (testbed.do_create(), 
326                                   testbed.do_connect())
327                         for testbed in self._testbeds.itervalues()])
328         
329         # resolve netrefs
330         self.do_netrefs(fail_if_undefined=True)
331         
332         # perform do_configure in parallel for al testbeds
333         # (it's internal configuration for each)
334         self._parallel([testbed.do_configure
335                         for testbed in self._testbeds.itervalues()])
336
337         # cross-connect (cannot be done in parallel)
338         for testbed in self._testbeds.values():
339             testbed.do_cross_connect()
340         
341         # start experiment (parallel start on all testbeds)
342         self._parallel([testbed.start
343                         for testbed in self._testbeds.itervalues()])
344
345     def stop(self):
346        for testbed in self._testbeds.values():
347            testbed.stop()
348
349     def is_finished(self, guid):
350         for testbed in self._testbeds.values():
351             for guid_ in testbed.guids:
352                 if guid_ == guid:
353                     return testbed.status(guid) == STATUS_FINISHED
354         raise RuntimeError("No element exists with guid %d" % guid)    
355
356     def shutdown(self):
357        for testbed in self._testbeds.values():
358            testbed.shutdown()
359
360     @staticmethod
361     def _netref_component_split(component):
362         match = COMPONENT_PATTERN.match(component)
363         if match:
364             return match.group("kind"), match.group("index")
365         else:
366             return component, None
367
368     def do_netrefs(self, fail_if_undefined = False):
369         COMPONENT_GETTERS = {
370             'addr' :
371                 lambda testbed, guid, index, name : 
372                     testbed.get_address(guid, index, name),
373             'route' :
374                 lambda testbed, guid, index, name : 
375                     testbed.get_route(guid, index, name),
376             'trace' :
377                 lambda testbed, guid, index, name : 
378                     testbed.trace(guid, index, name),
379             '' : 
380                 lambda testbed, guid, index, name : 
381                     testbed.get(TIME_NOW, guid, name),
382         }
383         
384         for (testbed_guid, guid), attrs in self._netrefs.iteritems():
385             testbed = self._testbeds[testbed_guid]
386             for name in attrs:
387                 value = testbed.get(TIME_NOW, guid, name)
388                 if isinstance(value, basestring):
389                     match = ATTRIBUTE_PATTERN_BASE.search(value)
390                     if match:
391                         label = match.group("label")
392                         if label.startswith('GUID-'):
393                             ref_guid = int(label[5:])
394                             if ref_guid:
395                                 expr = match.group("expr")
396                                 component = match.group("component")[1:] # skip the dot
397                                 attribute = match.group("attribute")
398                                 
399                                 # split compound components into component kind and index
400                                 # eg: 'addr[0]' -> ('addr', '0')
401                                 component, component_index = self._netref_component_split(component)
402                                 
403                                 # find object and resolve expression
404                                 for ref_testbed in self._testbeds.itervalues():
405                                     if component not in COMPONENT_GETTERS:
406                                         raise ValueError, "Malformed netref: %r - unknown component" % (expr,)
407                                     else:
408                                         value = COMPONENT_GETTERS[component](
409                                             ref_testbed, ref_guid, component_index, attribute)
410                                         if value: 
411                                             break
412                                 else:
413                                     # couldn't find value
414                                     if fail_if_undefined:
415                                         raise ValueError, "Unresolvable GUID: %r, in netref: %r" % (ref_guid, expr)
416
417     def _create_testbed_instances(self):
418         parser = XmlExperimentParser()
419         data = parser.from_xml_to_data(self._experiment_xml)
420         element_guids = list()
421         label_guids = dict()
422         data_guids = data.guids
423         netrefs = self._netrefs
424         for guid in data_guids:
425             if data.is_testbed_data(guid):
426                 (testbed_id, testbed_version) = data.get_testbed_data(guid)
427                 access_config = None if guid not in self._access_config else\
428                         self._access_config[guid]
429                 testbed = proxy.create_testbed_instance(testbed_id, 
430                         testbed_version, access_config)
431                 for (name, value) in data.get_attribute_data(guid):
432                     testbed.defer_configure(name, value)
433                 self._testbeds[guid] = testbed
434             else:
435                 element_guids.append(guid)
436                 label = data.get_attribute_data(guid, "label")
437                 if label is not None:
438                     if label in label_guids:
439                         raise RuntimeError, "Label %r is not unique" % (label,)
440                     label_guids[label] = guid
441         for guid in data_guids:
442             if not data.is_testbed_data(guid):
443                 for name, value in data.get_attribute_data(guid):
444                     if isinstance(value, basestring):
445                         match = ATTRIBUTE_PATTERN_BASE.search(value)
446                         if match:
447                             label = match.group("label")
448                             if not label.startswith('GUID-'):
449                                 ref_guid = label_guids.get(label)
450                                 if ref_guid is not None:
451                                     value = ATTRIBUTE_PATTERN_BASE.sub(
452                                         ATTRIBUTE_PATTERN_GUID_SUB % dict(
453                                             guid='GUID-%d' % (ref_guid,),
454                                             expr=match.group("expr"),
455                                             label=label), 
456                                         value)
457                                     data.set_attribute_data(guid, name, value)
458                                     
459                                     # memorize which guid-attribute pairs require
460                                     # postprocessing, to avoid excessive controller-testbed
461                                     # communication at configuration time
462                                     # (which could require high-latency network I/O)
463                                     (testbed_guid, factory_id) = data.get_box_data(guid)
464                                     netrefs.setdefault((testbed_guid,guid),set()).add(name)
465         self._program_testbed_instances(element_guids, data)
466
467     def _program_testbed_instances(self, element_guids, data):
468         for guid in element_guids:
469             (testbed_guid, factory_id) = data.get_box_data(guid)
470             testbed = self._testbeds[testbed_guid]
471             testbed.defer_create(guid, factory_id)
472             for (name, value) in data.get_attribute_data(guid):
473                 testbed.defer_create_set(guid, name, value)
474
475         for guid in element_guids: 
476             (testbed_guid, factory_id) = data.get_box_data(guid)
477             testbed = self._testbeds[testbed_guid]
478             for (connector_type_name, other_guid, other_connector_type_name) \
479                     in data.get_connection_data(guid):
480                 (testbed_guid, factory_id) = data.get_box_data(guid)
481                 (other_testbed_guid, other_factory_id) = data.get_box_data(
482                         other_guid)
483                 if testbed_guid == other_testbed_guid:
484                     testbed.defer_connect(guid, connector_type_name, other_guid, 
485                         other_connector_type_name)
486                 else:
487                     testbed.defer_cross_connect(guid, connector_type_name, other_guid, 
488                         other_testbed_id, other_factory_id, other_connector_type_name)
489             for trace_id in data.get_trace_data(guid):
490                 testbed.defer_add_trace(guid, trace_id)
491             for (autoconf, address, netprefix, broadcast) in \
492                     data.get_address_data(guid):
493                 if address != None:
494                     testbed.defer_add_address(guid, address, netprefix, broadcast)
495             for (destination, netprefix, nexthop) in data.get_route_data(guid):
496                 testbed.defer_add_route(guid, destination, netprefix, nexthop)
497