Ticket #71: inner parallelization of setup phases
[nepi.git] / src / nepi / core / testbed_impl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core import execute
5 from nepi.core.metadata import Metadata, Parallel
6 from nepi.util import validation
7 from nepi.util.constants import TIME_NOW, \
8         ApplicationStatus as AS, \
9         TestbedStatus as TS, \
10         CONNECTION_DELAY
11 from nepi.util.parallel import ParallelRun
12
13 import collections
14 import copy
15
16 class TestbedController(execute.TestbedController):
17     def __init__(self, testbed_id, testbed_version):
18         super(TestbedController, self).__init__(testbed_id, testbed_version)
19         self._status = TS.STATUS_ZERO
20         # testbed attributes for validation
21         self._attributes = None
22         # element factories for validation
23         self._factories = dict()
24
25         # experiment construction instructions
26         self._create = dict()
27         self._create_set = dict()
28         self._factory_set = dict()
29         self._connect = dict()
30         self._cross_connect = dict()
31         self._add_trace = dict()
32         self._add_address = dict()
33         self._add_route = dict()
34         self._configure = dict()
35
36         # log of set operations
37         self._setlog = dict()
38         # last set operations
39         self._set = dict()
40
41         # testbed element instances
42         self._elements = dict()
43
44         self._metadata = Metadata(self._testbed_id)
45         if self._metadata.testbed_version != testbed_version:
46             raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
47                     (testbed_id, testbed_version, self._metadata.testbed_version))
48         for factory in self._metadata.build_factories():
49             self._factories[factory.factory_id] = factory
50         self._attributes = self._metadata.testbed_attributes()
51         self._root_directory = None
52     
53     @property
54     def root_directory(self):
55         return self._root_directory
56
57     @property
58     def guids(self):
59         return self._create.keys()
60
61     @property
62     def elements(self):
63         return self._elements
64     
65     def defer_configure(self, name, value):
66         self._validate_testbed_attribute(name)
67         self._validate_testbed_value(name, value)
68         self._attributes.set_attribute_value(name, value)
69         self._configure[name] = value
70
71     def defer_create(self, guid, factory_id):
72         self._validate_factory_id(factory_id)
73         self._validate_not_guid(guid)
74         self._create[guid] = factory_id
75
76     def defer_create_set(self, guid, name, value):
77         self._validate_guid(guid)
78         self._validate_box_attribute(guid, name)
79         self._validate_box_value(guid, name, value)
80         if guid not in self._create_set:
81             self._create_set[guid] = dict()
82         self._create_set[guid][name] = value
83
84     def defer_factory_set(self, guid, name, value):
85         self._validate_guid(guid)
86         self._validate_factory_attribute(guid, name)
87         self._validate_factory_value(guid, name, value)
88         if guid not in self._factory_set:
89             self._factory_set[guid] = dict()
90         self._factory_set[guid][name] = value
91
92     def defer_connect(self, guid1, connector_type_name1, guid2, 
93             connector_type_name2):
94         self._validate_guid(guid1)
95         self._validate_guid(guid2)
96         factory1 = self._get_factory(guid1)
97         factory_id2 = self._create[guid2]
98         connector_type = factory1.connector_type(connector_type_name1)
99         connector_type.can_connect(self._testbed_id, factory_id2, 
100                 connector_type_name2, False)
101         self._validate_connection(guid1, connector_type_name1, guid2, 
102             connector_type_name2)
103
104         if not guid1 in self._connect:
105             self._connect[guid1] = dict()
106         if not connector_type_name1 in self._connect[guid1]:
107              self._connect[guid1][connector_type_name1] = dict()
108         self._connect[guid1][connector_type_name1][guid2] = \
109                connector_type_name2
110         if not guid2 in self._connect:
111             self._connect[guid2] = dict()
112         if not connector_type_name2 in self._connect[guid2]:
113              self._connect[guid2][connector_type_name2] = dict()
114         self._connect[guid2][connector_type_name2][guid1] = \
115                connector_type_name1
116
117     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
118             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
119             cross_connector_type_name):
120         self._validate_guid(guid)
121         factory = self._get_factory(guid)
122         connector_type = factory.connector_type(connector_type_name)
123         connector_type.can_connect(cross_testbed_id, cross_factory_id, 
124                 cross_connector_type_name, True)
125         self._validate_connection(guid, connector_type_name, cross_guid, 
126             cross_connector_type_name)
127
128         if not guid in self._cross_connect:
129             self._cross_connect[guid] = dict()
130         if not connector_type_name in self._cross_connect[guid]:
131              self._cross_connect[guid][connector_type_name] = dict()
132         self._cross_connect[guid][connector_type_name] = \
133                 (cross_guid, cross_testbed_guid, cross_testbed_id, 
134                 cross_factory_id, cross_connector_type_name)
135
136     def defer_add_trace(self, guid, trace_name):
137         self._validate_guid(guid)
138         self._validate_trace(guid, trace_name)
139         if not guid in self._add_trace:
140             self._add_trace[guid] = list()
141         self._add_trace[guid].append(trace_name)
142
143     def defer_add_address(self, guid, address, netprefix, broadcast):
144         self._validate_guid(guid)
145         self._validate_allow_addresses(guid)
146         if guid not in self._add_address:
147             self._add_address[guid] = list()
148         self._add_address[guid].append((address, netprefix, broadcast))
149
150     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
151         self._validate_guid(guid)
152         self._validate_allow_routes(guid)
153         if not guid in self._add_route:
154             self._add_route[guid] = list()
155         self._add_route[guid].append((destination, netprefix, nexthop, metric)) 
156
157     def do_setup(self):
158         self._root_directory = self._attributes.\
159             get_attribute_value("rootDirectory")
160         self._status = TS.STATUS_SETUP
161
162     def do_create(self):
163         def set_params(self, guid):
164             parameters = self._get_parameters(guid)
165             for name, value in parameters.iteritems():
166                 self.set(guid, name, value)
167             
168         self._do_in_factory_order(
169             'create_function',
170             self._metadata.create_order,
171             postaction = set_params )
172         self._status = TS.STATUS_CREATED
173
174     def _do_connect(self, init = True):
175         unconnected = copy.deepcopy(self._connect)
176         
177         while unconnected:
178             for guid1, connections in unconnected.items():
179                 factory1 = self._get_factory(guid1)
180                 for connector_type_name1, connections2 in connections.items():
181                     connector_type1 = factory1.connector_type(connector_type_name1)
182                     for guid2, connector_type_name2 in connections2.items():
183                         factory_id2 = self._create[guid2]
184                         # Connections are executed in a "From -> To" direction only
185                         # This explicitly ignores the "To -> From" (mirror) 
186                         # connections of every connection pair.
187                         if init:
188                             connect_code = connector_type1.connect_to_init_code(
189                                     self._testbed_id, factory_id2, 
190                                     connector_type_name2,
191                                     False)
192                         else:
193                             connect_code = connector_type1.connect_to_compl_code(
194                                     self._testbed_id, factory_id2, 
195                                     connector_type_name2,
196                                     False)
197                         delay = None
198                         if connect_code:
199                             delay = connect_code(self, guid1, guid2)
200
201                         if delay is not CONNECTION_DELAY:
202                             del unconnected[guid1][connector_type_name1][guid2]
203                     if not unconnected[guid1][connector_type_name1]:
204                         del unconnected[guid1][connector_type_name1]
205                 if not unconnected[guid1]:
206                     del unconnected[guid1]
207
208     def do_connect_init(self):
209         self._do_connect()
210
211     def do_connect_compl(self):
212         self._do_connect(init = False)
213         self._status = TS.STATUS_CONNECTED
214
215     def _do_in_factory_order(self, action, order, postaction = None, poststep = None):
216         guids = collections.defaultdict(list)
217         # order guids (elements) according to factory_id
218         for guid, factory_id in self._create.iteritems():
219             guids[factory_id].append(guid)
220         
221         # configure elements following the factory_id order
222         for factory_id in order:
223             # Create a parallel runner if we're given a Parallel() wrapper
224             runner = None
225             if isinstance(factory_id, Parallel):
226                 runner = ParallelRun(factory_id.maxthreads)
227                 factory_id = factory_id.factory
228             
229             # omit the factories that have no element to create
230             if factory_id not in guids:
231                 continue
232             
233             # configure action
234             factory = self._factories[factory_id]
235             if not getattr(factory, action):
236                 continue
237             def perform_action(guid):
238                 getattr(factory, action)(self, guid)
239                 if postaction:
240                     postaction(self, guid)
241
242             # perform the action on all elements, in parallel if so requested
243             if runner:
244                 runner.start()
245             for guid in guids[factory_id]:
246                 if runner:
247                     runner.put(perform_action, guid)
248                 else:
249                     perform_action(guid)
250             if runner:
251                 runner.join()
252             
253             # post hook
254             if poststep:
255                 for guid in guids[factory_id]:
256                     poststep(self, guid)
257
258     @staticmethod
259     def do_poststep_preconfigure(self, guid):
260         # dummy hook for implementations interested in
261         # two-phase configuration
262         pass
263
264     def do_preconfigure(self):
265         self._do_in_factory_order(
266             'preconfigure_function',
267             self._metadata.preconfigure_order,
268             poststep = self.do_poststep_preconfigure )
269
270     @staticmethod
271     def do_poststep_configure(self, guid):
272         # dummy hook for implementations interested in
273         # two-phase configuration
274         pass
275
276     def do_configure(self):
277         self._do_in_factory_order(
278             'configure_function',
279             self._metadata.configure_order,
280             poststep = self.do_poststep_configure )
281         self._status = TS.STATUS_CONFIGURED
282
283     def do_prestart(self):
284         self._do_in_factory_order(
285             'prestart_function',
286             self._metadata.prestart_order )
287
288     def _do_cross_connect(self, cross_data, init = True):
289         for guid, cross_connections in self._cross_connect.iteritems():
290             factory = self._get_factory(guid)
291             for connector_type_name, cross_connection in \
292                     cross_connections.iteritems():
293                 connector_type = factory.connector_type(connector_type_name)
294                 (cross_guid, cross_testbed_guid, cross_testbed_id,
295                     cross_factory_id, cross_connector_type_name) = cross_connection
296                 if init:
297                     connect_code = connector_type.connect_to_init_code(
298                         cross_testbed_id, cross_factory_id, 
299                         cross_connector_type_name,
300                         True)
301                 else:
302                     connect_code = connector_type.connect_to_compl_code(
303                         cross_testbed_id, cross_factory_id, 
304                         cross_connector_type_name,
305                         True)
306                 if connect_code:
307                     elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
308                     connect_code(self, guid, elem_cross_data)       
309
310     def do_cross_connect_init(self, cross_data):
311         self._do_cross_connect(cross_data)
312
313     def do_cross_connect_compl(self, cross_data):
314         self._do_cross_connect(cross_data, init = False)
315         self._status = TS.STATUS_CROSS_CONNECTED
316
317     def set(self, guid, name, value, time = TIME_NOW):
318         self._validate_guid(guid)
319         self._validate_box_attribute(guid, name)
320         self._validate_box_value(guid, name, value)
321         self._validate_modify_box_value(guid, name)
322         if guid not in self._set:
323             self._set[guid] = dict()
324             self._setlog[guid] = dict()
325         if time not in self._setlog[guid]:
326             self._setlog[guid][time] = dict()
327         self._setlog[guid][time][name] = value
328         self._set[guid][name] = value
329
330     def get(self, guid, name, time = TIME_NOW):
331         """
332         gets an attribute from box definitions if available. 
333         Throws KeyError if the GUID wasn't created
334         through the defer_create interface, and AttributeError if the
335         attribute isn't available (doesn't exist or is design-only)
336         """
337         self._validate_guid(guid)
338         self._validate_box_attribute(guid, name)
339         if guid in self._set and name in self._set[guid]:
340             return self._set[guid][name]
341         if guid in self._create_set and name in self._create_set[guid]:
342             return self._create_set[guid][name]
343         # if nothing else found, returns the factory default value
344         factory = self._get_factory(guid)
345         return factory.box_attributes.get_attribute_value(name)
346
347     def get_route(self, guid, index, attribute):
348         """
349         returns information given to defer_add_route.
350         
351         Raises AttributeError if an invalid attribute is requested
352             or if the indexed routing rule does not exist.
353         
354         Raises KeyError if the GUID has not been seen by
355             defer_add_route
356         """
357         ATTRIBUTES = ['Destination', 'NetPrefix', 'NextHop']
358         
359         if attribute not in ATTRIBUTES:
360             raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
361         
362         attribute_index = ATTRIBUTES.index(attribute)
363         
364         routes = self._add_route.get(guid)
365         if not routes:
366             raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
367        
368         index = int(index)
369         if not (0 <= index < len(addresses)):
370             raise AttributeError, "GUID %r at %s does not have a routing entry #%s" % (
371                 guid, self._testbed_id, index)
372         
373         return routes[index][attribute_index]
374
375     def get_address(self, guid, index, attribute='Address'):
376         """
377         returns information given to defer_add_address
378         
379         Raises AttributeError if an invalid attribute is requested
380             or if the indexed routing rule does not exist.
381         
382         Raises KeyError if the GUID has not been seen by
383             defer_add_address
384         """
385         ATTRIBUTES = ['Address', 'NetPrefix', 'Broadcast']
386         
387         if attribute not in ATTRIBUTES:
388             raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
389         
390         attribute_index = ATTRIBUTES.index(attribute)
391         
392         addresses = self._add_address.get(guid)
393         if not addresses:
394             raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
395         
396         index = int(index)
397         if not (0 <= index < len(addresses)):
398             raise AttributeError, "GUID %r at %s does not have an address #%s" % (
399                 guid, self._testbed_id, index)
400         
401         return addresses[index][attribute_index]
402
403     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
404         factory = self._get_factory(guid)
405         attribute_list = list()
406         return factory.box_attributes.get_attribute_list(filter_flags, exclude)
407
408     def get_factory_id(self, guid):
409         factory = self._get_factory(guid)
410         return factory.factory_id
411
412     def start(self, time = TIME_NOW):
413         self._do_in_factory_order(
414             'start_function',
415             self._metadata.start_order )
416         self._status = TS.STATUS_STARTED
417
418     #action: NotImplementedError
419
420     def stop(self, time = TIME_NOW):
421         self._do_in_factory_order(
422             'stop_function',
423             reversed(self._metadata.start_order) )
424         self._status = TS.STATUS_STOPPED
425
426     def status(self, guid = None):
427         if not guid:
428             return self._status
429         self._validate_guid(guid)
430         factory = self._get_factory(guid)
431         status_function = factory.status_function
432         if status_function:
433             return status_function(self, guid)
434         return AS.STATUS_UNDETERMINED
435
436     def trace(self, guid, trace_id, attribute='value'):
437         if attribute == 'value':
438             fd = open("%s" % self.trace_filepath(guid, trace_id), "r")
439             content = fd.read()
440             fd.close()
441         elif attribute == 'path':
442             content = self.trace_filepath(guid, trace_id)
443         else:
444             content = None
445         return content
446
447     def traces_info(self):
448         traces_info = dict()
449         host = self._attributes.get_attribute_value("deployment_host")
450         user = self._attributes.get_attribute_value("deployment_user")
451         for guid, trace_list in self._add_trace.iteritems(): 
452             traces_info[guid] = dict()
453             for trace_id in trace_list:
454                 traces_info[guid][trace_id] = dict()
455                 filepath = self.trace(guid, trace_id, attribute = "path")
456                 traces_info[guid][trace_id]["host"] = host
457                 traces_info[guid][trace_id]["user"] = user
458                 traces_info[guid][trace_id]["filepath"] = filepath
459         return traces_info
460
461     def trace_filepath(self, guid, trace_id):
462         """
463         Return a trace's file path, for TestbedController's default 
464         implementation of trace()
465         """
466         raise NotImplementedError
467
468     #shutdown: NotImplementedError
469
470     def get_connected(self, guid, connector_type_name, 
471             other_connector_type_name):
472         """searchs the connected elements for the specific connector_type_name 
473         pair"""
474         if guid not in self._connect:
475             return []
476         # all connections for all connectors for guid
477         all_connections = self._connect[guid]
478         if connector_type_name not in all_connections:
479             return []
480         # all connections for the specific connector
481         connections = all_connections[connector_type_name]
482         specific_connections = [otr_guid for otr_guid, otr_connector_type_name \
483                 in connections.iteritems() if \
484                 otr_connector_type_name == other_connector_type_name]
485         return specific_connections
486
487     def _get_connection_count(self, guid, connection_type_name):
488         count = 0
489         cross_count = 0
490         if guid in self._connect and connection_type_name in \
491                 self._connect[guid]:
492             count = len(self._connect[guid][connection_type_name])
493         if guid in self._cross_connect and connection_type_name in \
494                 self._cross_connect[guid]:
495             cross_count = len(self._cross_connect[guid][connection_type_name])
496         return count + cross_count
497
498     def _get_traces(self, guid):
499         return [] if guid not in self._add_trace else self._add_trace[guid]
500
501     def _get_parameters(self, guid):
502         return dict() if guid not in self._create_set else \
503                 self._create_set[guid]
504
505     def _get_factory(self, guid):
506         factory_id = self._create[guid]
507         return self._factories[factory_id]
508
509     def _get_factory_id(self, guid):
510         """ Returns the factory ID of the (perhaps not yet) created object """
511         return self._create.get(guid, None)
512
513     def _validate_guid(self, guid):
514         if not guid in self._create:
515             raise RuntimeError("Element guid %d doesn't exist" % guid)
516
517     def _validate_not_guid(self, guid):
518         if guid in self._create:
519             raise AttributeError("Cannot add elements with the same guid: %d" %
520                     guid)
521
522     def _validate_factory_id(self, factory_id):
523         if factory_id not in self._factories:
524             raise AttributeError("Invalid element type %s for testbed version %s" %
525                     (factory_id, self._testbed_version))
526
527     def _validate_testbed_attribute(self, name):
528         if not self._attributes.has_attribute(name):
529             raise AttributeError("Invalid testbed attribute %s for testbed" % \
530                     name)
531
532     def _validate_testbed_value(self, name, value):
533         if not self._attributes.is_attribute_value_valid(name, value):
534             raise AttributeError("Invalid value %s for testbed attribute %s" % \
535                 (value, name))
536
537     def _validate_box_attribute(self, guid, name):
538         factory = self._get_factory(guid)
539         if not factory.box_attributes.has_attribute(name):
540             raise AttributeError("Invalid attribute %s for element type %s" %
541                     (name, factory.factory_id))
542
543     def _validate_box_value(self, guid, name, value):
544         factory = self._get_factory(guid)
545         if not factory.box_attributes.is_attribute_value_valid(name, value):
546             raise AttributeError("Invalid value %s for attribute %s" % \
547                 (value, name))
548
549     def _validate_factory_attribute(self, guid, name):
550         factory = self._get_factory(guid)
551         if not factory.has_attribute(name):
552             raise AttributeError("Invalid attribute %s for element type %s" %
553                     (name, factory.factory_id))
554
555     def _validate_factory_value(self, guid, name, value):
556         factory = self._get_factory(guid)
557         if not factory.is_attribute_value_valid(name, value):
558             raise AttributeError("Invalid value %s for attribute %s" % \
559                 (value, name))
560
561     def _validate_trace(self, guid, trace_name):
562         factory = self._get_factory(guid)
563         if not trace_name in factory.traces_list:
564             raise RuntimeError("Element type '%s' has no trace '%s'" %
565                     (factory.factory_id, trace_name))
566
567     def _validate_allow_addresses(self, guid):
568         factory = self._get_factory(guid)
569         if not factory.allow_addresses:
570             raise RuntimeError("Element type '%s' doesn't support addresses" %
571                     factory.factory_id)
572         attr_name = "maxAddresses"
573         if guid in self._create_set and attr_name in self._create_set[guid]:
574             max_addresses = self._create_set[guid][attr_name]
575         else:
576             factory = self._get_factory(guid)
577             max_addresses = factory.box_attributes.get_attribute_value(attr_name)
578         if guid in self._add_address:
579             count_addresses = len(self._add_address[guid])
580             if max_addresses == count_addresses:
581                 raise RuntimeError("Element guid %d of type '%s' can't accept \
582                         more addresses" % (guid, factory.factory_id))
583
584     def _validate_allow_routes(self, guid):
585         factory = self._get_factory(guid)
586         if not factory.allow_routes:
587             raise RuntimeError("Element type '%s' doesn't support routes" %
588                     factory.factory_id)
589
590     def _validate_connection(self, guid1, connector_type_name1, guid2, 
591             connector_type_name2, cross = False):
592         # can't connect with self
593         if guid1 == guid2:
594             raise AttributeError("Can't connect guid %d to self" % \
595                 (guid1))
596         # the connection is already done, so ignore
597         connected = self.get_connected(guid1, connector_type_name1, 
598                 connector_type_name2)
599         if guid2 in connected:
600             return
601         count1 = self._get_connection_count(guid1, connector_type_name1)
602         factory1 = self._get_factory(guid1)
603         connector_type1 = factory1.connector_type(connector_type_name1)
604         if count1 == connector_type1.max:
605             raise AttributeError("Connector %s is full for guid %d" % \
606                 (connector_type_name1, guid1))
607
608     def _validate_modify_box_value(self, guid, name):
609         factory = self._get_factory(guid)
610         if self._status > TS.STATUS_STARTED and \
611                 (factory.box_attributes.is_attribute_exec_read_only(name) or \
612                 factory.box_attributes.is_attribute_exec_immutable(name)):
613             raise AttributeError("Attribute %s can only be modified during experiment design" % name)
614