merged ConnectorTyper for design and execution
[nepi.git] / src / nepi / core / testbed_impl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core import execute
5 from nepi.core.metadata import Metadata
6 from nepi.util import validation
7 from nepi.util.constants import TIME_NOW, \
8         ApplicationStatus as AS, \
9         TestbedStatus as TS, \
10         CONNECTION_DELAY
11
12 import collections
13 import copy
14
15 class TestbedController(execute.TestbedController):
16     def __init__(self, testbed_id, testbed_version):
17         super(TestbedController, self).__init__(testbed_id, testbed_version)
18         self._status = TS.STATUS_ZERO
19         # testbed attributes for validation
20         self._attributes = None
21         # element factories for validation
22         self._factories = dict()
23
24         # experiment construction instructions
25         self._create = dict()
26         self._create_set = dict()
27         self._factory_set = dict()
28         self._connect = dict()
29         self._cross_connect = dict()
30         self._add_trace = dict()
31         self._add_address = dict()
32         self._add_route = dict()
33         self._configure = dict()
34
35         # log of set operations
36         self._setlog = dict()
37         # last set operations
38         self._set = dict()
39
40         # testbed element instances
41         self._elements = dict()
42
43         self._metadata = Metadata(self._testbed_id, self._testbed_version)
44         for factory in self._metadata.build_execute_factories():
45             self._factories[factory.factory_id] = factory
46         self._attributes = self._metadata.testbed_attributes()
47         self._root_directory = None
48     
49     @property
50     def root_directory(self):
51         return self._root_directory
52
53     @property
54     def guids(self):
55         return self._create.keys()
56
57     @property
58     def elements(self):
59         return self._elements
60     
61     def _get_factory_id(self, guid):
62         """ Returns the factory ID of the (perhaps not yet) created object """
63         return self._create.get(guid, None)
64
65     def defer_configure(self, name, value):
66         if not self._attributes.has_attribute(name):
67             raise AttributeError("Invalid attribute %s for testbed" % name)
68         # Validation
69         self._attributes.set_attribute_value(name, value)
70         self._configure[name] = value
71
72     def defer_create(self, guid, factory_id):
73         if factory_id not in self._factories:
74             raise AttributeError("Invalid element type %s for testbed version %s" %
75                     (factory_id, self._testbed_version))
76         if guid in self._create:
77             raise AttributeError("Cannot add elements with the same guid: %d" %
78                     guid)
79         self._create[guid] = factory_id
80
81     def defer_create_set(self, guid, name, value):
82         if not guid in self._create:
83             raise RuntimeError("Element guid %d doesn't exist" % guid)
84         factory = self._get_factory(guid)
85         if not factory.box_attributes.has_attribute(name):
86             raise AttributeError("Invalid attribute %s for element type %s" %
87                     (name, factory.factory_id))
88         if not factory.box_attributes.is_attribute_value_valid(name, value):
89             raise AttributeError("Invalid value %s for attribute %s" % \
90                 (value, name))
91         if guid not in self._create_set:
92             self._create_set[guid] = dict()
93         self._create_set[guid][name] = value
94
95     def defer_factory_set(self, guid, name, value):
96         if not guid in self._create:
97             raise RuntimeError("Element guid %d doesn't exist" % guid)
98         factory = self._get_factory(guid)
99         if not factory.has_attribute(name):
100             raise AttributeError("Invalid attribute %s for element type %s" %
101                     (name, factory.factory_id))
102         if not factory.is_attribute_value_valid(name, value):
103             raise AttributeError("Invalid value %s for attribute %s" % \
104                 (value, name))
105         if guid not in self._factory_set:
106             self._factory_set[guid] = dict()
107         self._factory_set[guid][name] = value
108
109     def defer_connect(self, guid1, connector_type_name1, guid2, 
110             connector_type_name2):
111         factory1 = self._get_factory(guid1)
112         factory_id2 = self._create[guid2]
113         # TODO VALIDATE!!!
114         #if self.box.guid == connector.box.guid:
115         #    return False
116         #if self.is_full() or connector.is_full():
117         #    return False
118         #if self.is_connected(connector):
119         #    return False
120         #count = self._get_connection_count(guid1, connector_type_name1)
121         connector_type = factory1.connector_type(connector_type_name1)
122         connector_type.can_connect(self._testbed_id, factory_id2, 
123                 connector_type_name2, False)
124         if not guid1 in self._connect:
125             self._connect[guid1] = dict()
126         if not connector_type_name1 in self._connect[guid1]:
127              self._connect[guid1][connector_type_name1] = dict()
128         self._connect[guid1][connector_type_name1][guid2] = \
129                connector_type_name2
130         if not guid2 in self._connect:
131             self._connect[guid2] = dict()
132         if not connector_type_name2 in self._connect[guid2]:
133              self._connect[guid2][connector_type_name2] = dict()
134         self._connect[guid2][connector_type_name2][guid1] = \
135                 connector_type_name1
136
137     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
138             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
139             cross_connector_type_name):
140         factory = self._get_factory(guid)
141         # TODO VALIDATE!!!
142         #if self.box.guid == connector.box.guid:
143         #    return False
144         #if self.is_full() or connector.is_full():
145         #    return False
146         #if self.is_connected(connector):
147         #    return False
148         #count = self._get_connection_count(guid, connector_type_name)
149         connector_type = factory.connector_type(connector_type_name)
150         connector_type.can_connect(cross_testbed_id, cross_factory_id, 
151                 cross_connector_type_name, True)
152         if not guid in self._cross_connect:
153             self._cross_connect[guid] = dict()
154         if not connector_type_name in self._cross_connect[guid]:
155              self._cross_connect[guid][connector_type_name] = dict()
156         self._cross_connect[guid][connector_type_name] = \
157                 (cross_guid, cross_testbed_guid, cross_testbed_id, 
158                 cross_factory_id, cross_connector_type_name)
159
160     def defer_add_trace(self, guid, trace_id):
161         if not guid in self._create:
162             raise RuntimeError("Element guid %d doesn't exist" % guid)
163         factory = self._get_factory(guid)
164         if not trace_id in factory.traces:
165             raise RuntimeError("Element type '%s' has no trace '%s'" %
166                     (factory.factory_id, trace_id))
167         if not guid in self._add_trace:
168             self._add_trace[guid] = list()
169         self._add_trace[guid].append(trace_id)
170
171     def defer_add_address(self, guid, address, netprefix, broadcast):
172         if not guid in self._create:
173             raise RuntimeError("Element guid %d doesn't exist" % guid)
174         factory = self._get_factory(guid)
175         if not factory.allow_addresses:
176             raise RuntimeError("Element type '%s' doesn't support addresses" %
177                     factory.factory_id)
178             max_addresses = 1 # TODO: MAKE THIS PARAMETRIZABLE
179         if guid in self._add_address:
180             count_addresses = len(self._add_address[guid])
181             if max_addresses == count_addresses:
182                 raise RuntimeError("Element guid %d of type '%s' can't accept \
183                         more addresses" % (guid, factory.factory_id))
184         else:
185             self._add_address[guid] = list()
186         self._add_address[guid].append((address, netprefix, broadcast))
187
188     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
189         if not guid in self._create:
190             raise RuntimeError("Element guid %d doesn't exist" % guid)
191         factory = self._get_factory(guid)
192         if not factory.allow_routes:
193             raise RuntimeError("Element type '%s' doesn't support routes" %
194                     factory.factory_id)
195         if not guid in self._add_route:
196             self._add_route[guid] = list()
197         self._add_route[guid].append((destination, netprefix, nexthop, metric)) 
198
199     def do_setup(self):
200         self._root_directory = self._attributes.\
201             get_attribute_value("rootDirectory")
202         self._status = TS.STATUS_SETUP
203
204     def do_create(self):
205         def set_params(self, guid):
206             parameters = self._get_parameters(guid)
207             for name, value in parameters.iteritems():
208                 self.set(guid, name, value)
209             
210         self._do_in_factory_order(
211             'create_function',
212             self._metadata.create_order,
213             postaction = set_params )
214         self._status = TS.STATUS_CREATED
215
216     def _do_connect(self, init = True):
217         unconnected = copy.deepcopy(self._connect)
218         
219         while unconnected:
220             for guid1, connections in unconnected.items():
221                 factory1 = self._get_factory(guid1)
222                 for connector_type_name1, connections2 in connections.items():
223                     connector_type1 = factory1.connector_type(connector_type_name1)
224                     for guid2, connector_type_name2 in connections2.items():
225                         factory_id2 = self._create[guid2]
226                         # Connections are executed in a "From -> To" direction only
227                         # This explicitly ignores the "To -> From" (mirror) 
228                         # connections of every connection pair.
229                         if init:
230                             connect_code = connector_type1.connect_to_init_code(
231                                     self._testbed_id, factory_id2, 
232                                     connector_type_name2,
233                                     False)
234                         else:
235                             connect_code = connector_type1.connect_to_compl_code(
236                                     self._testbed_id, factory_id2, 
237                                     connector_type_name2,
238                                     False)
239                         delay = None
240                         if connect_code:
241                             delay = connect_code(self, guid1, guid2)
242
243                         if delay is not CONNECTION_DELAY:
244                             del unconnected[guid1][connector_type_name1][guid2]
245                     if not unconnected[guid1][connector_type_name1]:
246                         del unconnected[guid1][connector_type_name1]
247                 if not unconnected[guid1]:
248                     del unconnected[guid1]
249
250     def do_connect_init(self):
251         self._do_connect()
252
253     def do_connect_compl(self):
254         self._do_connect(init = False)
255         self._status = TS.STATUS_CONNECTED
256
257     def _do_in_factory_order(self, action, order, postaction = None, poststep = None):
258         guids = collections.defaultdict(list)
259         # order guids (elements) according to factory_id
260         for guid, factory_id in self._create.iteritems():
261             guids[factory_id].append(guid)
262         # configure elements following the factory_id order
263         for factory_id in order:
264             # omit the factories that have no element to create
265             if factory_id not in guids:
266                 continue
267             factory = self._factories[factory_id]
268             if not getattr(factory, action):
269                 continue
270             for guid in guids[factory_id]:
271                 getattr(factory, action)(self, guid)
272                 if postaction:
273                     postaction(self, guid)
274             if poststep:
275                 for guid in guids[factory_id]:
276                     poststep(self, guid)
277
278     @staticmethod
279     def do_poststep_preconfigure(self, guid):
280         # dummy hook for implementations interested in
281         # two-phase configuration
282         pass
283
284     def do_preconfigure(self):
285         self._do_in_factory_order(
286             'preconfigure_function',
287             self._metadata.preconfigure_order,
288             poststep = self.do_poststep_preconfigure )
289
290     @staticmethod
291     def do_poststep_configure(self, guid):
292         # dummy hook for implementations interested in
293         # two-phase configuration
294         pass
295
296     def do_configure(self):
297         self._do_in_factory_order(
298             'configure_function',
299             self._metadata.configure_order,
300             poststep = self.do_poststep_configure )
301         self._status = TS.STATUS_CONFIGURED
302
303     def do_prestart(self):
304         self._do_in_factory_order(
305             'prestart_function',
306             self._metadata.prestart_order )
307
308     def _do_cross_connect(self, cross_data, init = True):
309         for guid, cross_connections in self._cross_connect.iteritems():
310             factory = self._get_factory(guid)
311             for connector_type_name, cross_connection in \
312                     cross_connections.iteritems():
313                 connector_type = factory.connector_type(connector_type_name)
314                 (cross_guid, cross_testbed_guid, cross_testbed_id,
315                     cross_factory_id, cross_connector_type_name) = cross_connection
316                 if init:
317                     connect_code = connector_type.connect_to_init_code(
318                         cross_testbed_id, cross_factory_id, 
319                         cross_connector_type_name,
320                         True)
321                 else:
322                     connect_code = connector_type.connect_to_compl_code(
323                         cross_testbed_id, cross_factory_id, 
324                         cross_connector_type_name,
325                         True)
326                 if connect_code:
327                     elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
328                     connect_code(self, guid, elem_cross_data)       
329
330     def do_cross_connect_init(self, cross_data):
331         self._do_cross_connect(cross_data)
332
333     def do_cross_connect_compl(self, cross_data):
334         self._do_cross_connect(cross_data, init = False)
335         self._status = TS.STATUS_CROSS_CONNECTED
336
337     def set(self, guid, name, value, time = TIME_NOW):
338         if not guid in self._create:
339             raise RuntimeError("Element guid %d doesn't exist" % guid)
340         factory = self._get_factory(guid)
341         if not factory.box_attributes.has_attribute(name):
342             raise AttributeError("Invalid attribute %s for element type %s" %
343                     (name, factory.factory_id))
344         if self._status > TS.STATUS_STARTED and \
345                 factory.box_attributes.is_attribute_design_only(name):
346             raise AttributeError("Attribute %s can only be modified during experiment design" % name)
347         if not factory.box_attributes.is_attribute_value_valid(name, value):
348             raise AttributeError("Invalid value %s for attribute %s" % \
349                     (value, name))
350         if guid not in self._set:
351             self._set[guid] = dict()
352             self._setlog[guid] = dict()
353         if time not in self._setlog[guid]:
354             self._setlog[guid][time] = dict()
355         self._setlog[guid][time][name] = value
356         self._set[guid][name] = value
357
358     def get(self, guid, name, time = TIME_NOW):
359         """
360         gets an attribute from box definitions if available. 
361         Throws KeyError if the GUID wasn't created
362         through the defer_create interface, and AttributeError if the
363         attribute isn't available (doesn't exist or is design-only)
364         """
365         if not guid in self._create:
366             raise KeyError, "Element guid %d doesn't exist" % guid
367         factory = self._get_factory(guid)
368         if not factory.box_attributes.has_attribute(name):
369             raise AttributeError, "Invalid attribute %s for element type %s" % \
370             (name, factory.factory_id)
371         if guid in self._set and name in self._set[guid]:
372             return self._set[guid][name]
373         if guid in self._create_set and name in self._create_set[guid]:
374             return self._create_set[guid][name]
375         return factory.box_attributes.get_attribute_value(name)
376
377     def get_route(self, guid, index, attribute):
378         """
379         returns information given to defer_add_route.
380         
381         Raises AttributeError if an invalid attribute is requested
382             or if the indexed routing rule does not exist.
383         
384         Raises KeyError if the GUID has not been seen by
385             defer_add_route
386         """
387         ATTRIBUTES = ['Destination', 'NetPrefix', 'NextHop']
388         
389         if attribute not in ATTRIBUTES:
390             raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
391         
392         attribute_index = ATTRIBUTES.index(attribute)
393         
394         routes = self._add_route.get(guid)
395         if not routes:
396             raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
397        
398         index = int(index)
399         if not (0 <= index < len(addresses)):
400             raise AttributeError, "GUID %r at %s does not have a routing entry #%s" % (
401                 guid, self._testbed_id, index)
402         
403         return routes[index][attribute_index]
404
405     def get_address(self, guid, index, attribute='Address'):
406         """
407         returns information given to defer_add_address
408         
409         Raises AttributeError if an invalid attribute is requested
410             or if the indexed routing rule does not exist.
411         
412         Raises KeyError if the GUID has not been seen by
413             defer_add_address
414         """
415         ATTRIBUTES = ['Address', 'NetPrefix', 'Broadcast']
416         
417         if attribute not in ATTRIBUTES:
418             raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
419         
420         attribute_index = ATTRIBUTES.index(attribute)
421         
422         addresses = self._add_address.get(guid)
423         if not addresses:
424             raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
425         
426         index = int(index)
427         if not (0 <= index < len(addresses)):
428             raise AttributeError, "GUID %r at %s does not have an address #%s" % (
429                 guid, self._testbed_id, index)
430         
431         return addresses[index][attribute_index]
432
433     def get_attribute_list(self, guid):
434         factory = self._get_factory(guid)
435         attribute_list = list()
436         return factory.box_attributes.attributes_list
437
438     def get_factory_id(self, guid):
439         factory = self._get_factory(guid)
440         return factory.factory_id
441
442     def start(self, time = TIME_NOW):
443         self._do_in_factory_order(
444             'start_function',
445             self._metadata.start_order )
446         self._status = TS.STATUS_STARTED
447
448     #action: NotImplementedError
449
450     def stop(self, time = TIME_NOW):
451         self._do_in_factory_order(
452             'stop_function',
453             reversed(self._metadata.start_order) )
454         self._status = TS.STATUS_STOPPED
455
456     def status(self, guid = None):
457         if not guid:
458             return self._status
459         if not guid in self._create:
460             raise RuntimeError("Element guid %d doesn't exist" % guid)
461         factory = self._get_factory(guid)
462         status_function = factory.status_function
463         if status_function:
464             return status_function(self, guid)
465         return AS.STATUS_UNDETERMINED
466
467     def trace(self, guid, trace_id, attribute='value'):
468         if attribute == 'value':
469             fd = open("%s" % self.trace_filepath(guid, trace_id), "r")
470             content = fd.read()
471             fd.close()
472         elif attribute == 'path':
473             content = self.trace_filepath(guid, trace_id)
474         else:
475             content = None
476         return content
477
478     def traces_info(self):
479         traces_info = dict()
480         host = self._attributes.get_attribute_value("deployment_host")
481         user = self._attributes.get_attribute_value("deployment_user")
482         for guid, trace_list in self._add_trace.iteritems(): 
483             traces_info[guid] = dict()
484             for trace_id in trace_list:
485                 traces_info[guid][trace_id] = dict()
486                 filepath = self.trace(guid, trace_id, attribute = "path")
487                 traces_info[guid][trace_id]["host"] = host
488                 traces_info[guid][trace_id]["user"] = user
489                 traces_info[guid][trace_id]["filepath"] = filepath
490         return traces_info
491
492     def trace_filepath(self, guid, trace_id):
493         """
494         Return a trace's file path, for TestbedController's default 
495         implementation of trace()
496         """
497         raise NotImplementedError
498
499     #shutdown: NotImplementedError
500
501     def get_connected(self, guid, connector_type_name, 
502             other_connector_type_name):
503         """searchs the connected elements for the specific connector_type_name 
504         pair"""
505         if guid not in self._connect:
506             return []
507         # all connections for all connectors for guid
508         all_connections = self._connect[guid]
509         if connector_type_name not in all_connections:
510             return []
511         # all connections for the specific connector
512         connections = all_connections[connector_type_name]
513         specific_connections = [otr_guid for otr_guid, otr_connector_type_name \
514                 in connections.iteritems() if \
515                 otr_connector_type_name == other_connector_type_name]
516         return specific_connections
517
518     def _get_connection_count(self, guid, connection_type_name):
519         count = 0
520         cross_count = 0
521         if guid in self._connect and connection_type_name in \
522                 self._connect[guid]:
523             count = len(self._connect[guid][connection_type_name])
524         if guid in self._cross_connect and connection_type_name in \
525                 self._cross_connect[guid]:
526             cross_count = len(self._cross_connect[guid][connection_type_name])
527         return count + cross_count
528
529     def _get_traces(self, guid):
530         return [] if guid not in self._add_trace else self._add_trace[guid]
531
532     def _get_parameters(self, guid):
533         return dict() if guid not in self._create_set else \
534                 self._create_set[guid]
535
536     def _get_factory(self, guid):
537         factory_id = self._create[guid]
538         return self._factories[factory_id]
539