Added routes to OMF nodes
[nepi.git] / src / nepi / core / testbed_impl.py
1 # -*- coding: utf-8 -*-
2
3 from nepi.core import execute
4 from nepi.core.metadata import Metadata, Parallel
5 from nepi.util import validation
6 from nepi.util.constants import TIME_NOW, \
7         ApplicationStatus as AS, \
8         TestbedStatus as TS, \
9         CONNECTION_DELAY
10 from nepi.util.parallel import ParallelRun
11
12 import collections
13 import copy
14 import logging
15
16 class TestbedController(execute.TestbedController):
17     def __init__(self, testbed_id, testbed_version):
18         super(TestbedController, self).__init__(testbed_id, testbed_version)
19         self._status = TS.STATUS_ZERO
20         # testbed attributes for validation
21         self._attributes = None
22         # element factories for validation
23         self._factories = dict()
24
25         # experiment construction instructions
26         self._create = dict()
27         self._create_set = dict()
28         self._factory_set = dict()
29         self._connect = dict()
30         self._cross_connect = dict()
31         self._add_trace = dict()
32         self._add_address = dict()
33         self._add_route = dict()
34         self._configure = dict()
35
36         # log of set operations
37         self._setlog = dict()
38         # last set operations
39         self._set = dict()
40
41         # testbed element instances
42         self._elements = dict()
43
44         self._metadata = Metadata(self._testbed_id)
45         if self._metadata.testbed_version != testbed_version:
46             raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
47                     (testbed_id, testbed_version, self._metadata.testbed_version))
48         for factory in self._metadata.build_factories():
49             self._factories[factory.factory_id] = factory
50         self._attributes = self._metadata.testbed_attributes()
51         self._root_directory = None
52         
53         # Logging
54         self._logger = logging.getLogger("nepi.core.testbed_impl")
55     
56     @property
57     def root_directory(self):
58         return self._root_directory
59
60     @property
61     def guids(self):
62         return self._create.keys()
63
64     @property
65     def elements(self):
66         return self._elements
67     
68     def defer_configure(self, name, value):
69         self._validate_testbed_attribute(name)
70         self._validate_testbed_value(name, value)
71         self._attributes.set_attribute_value(name, value)
72         self._configure[name] = value
73
74     def defer_create(self, guid, factory_id):
75         self._validate_factory_id(factory_id)
76         self._validate_not_guid(guid)
77         self._create[guid] = factory_id
78
79     def defer_create_set(self, guid, name, value):
80         self._validate_guid(guid)
81         self._validate_box_attribute(guid, name)
82         self._validate_box_value(guid, name, value)
83         if guid not in self._create_set:
84             self._create_set[guid] = dict()
85         self._create_set[guid][name] = value
86
87     def defer_factory_set(self, guid, name, value):
88         self._validate_guid(guid)
89         self._validate_factory_attribute(guid, name)
90         self._validate_factory_value(guid, name, value)
91         if guid not in self._factory_set:
92             self._factory_set[guid] = dict()
93         self._factory_set[guid][name] = value
94
95     def defer_connect(self, guid1, connector_type_name1, guid2, 
96             connector_type_name2):
97         self._validate_guid(guid1)
98         self._validate_guid(guid2)
99         factory1 = self._get_factory(guid1)
100         factory_id2 = self._create[guid2]
101         connector_type = factory1.connector_type(connector_type_name1)
102         connector_type.can_connect(self._testbed_id, factory_id2, 
103                 connector_type_name2, False)
104         self._validate_connection(guid1, connector_type_name1, guid2, 
105             connector_type_name2)
106
107         if not guid1 in self._connect:
108             self._connect[guid1] = dict()
109         if not connector_type_name1 in self._connect[guid1]:
110              self._connect[guid1][connector_type_name1] = dict()
111         self._connect[guid1][connector_type_name1][guid2] = \
112                connector_type_name2
113         if not guid2 in self._connect:
114             self._connect[guid2] = dict()
115         if not connector_type_name2 in self._connect[guid2]:
116              self._connect[guid2][connector_type_name2] = dict()
117         self._connect[guid2][connector_type_name2][guid1] = \
118                connector_type_name1
119
120     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
121             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
122             cross_connector_type_name):
123         self._validate_guid(guid)
124         factory = self._get_factory(guid)
125         connector_type = factory.connector_type(connector_type_name)
126         connector_type.can_connect(cross_testbed_id, cross_factory_id, 
127                 cross_connector_type_name, True)
128         self._validate_connection(guid, connector_type_name, cross_guid, 
129             cross_connector_type_name)
130
131         if not guid in self._cross_connect:
132             self._cross_connect[guid] = dict()
133         if not connector_type_name in self._cross_connect[guid]:
134              self._cross_connect[guid][connector_type_name] = dict()
135         self._cross_connect[guid][connector_type_name] = \
136                 (cross_guid, cross_testbed_guid, cross_testbed_id, 
137                 cross_factory_id, cross_connector_type_name)
138
139     def defer_add_trace(self, guid, trace_name):
140         self._validate_guid(guid)
141         self._validate_trace(guid, trace_name)
142         if not guid in self._add_trace:
143             self._add_trace[guid] = list()
144         self._add_trace[guid].append(trace_name)
145
146     def defer_add_address(self, guid, address, netprefix, broadcast):
147         self._validate_guid(guid)
148         self._validate_allow_addresses(guid)
149         if guid not in self._add_address:
150             self._add_address[guid] = list()
151         self._add_address[guid].append((address, netprefix, broadcast))
152
153     def defer_add_route(self, guid, destination, netprefix, nexthop, 
154             metric = 0, device = None):
155         self._validate_guid(guid)
156         self._validate_allow_routes(guid)
157         if not guid in self._add_route:
158             self._add_route[guid] = list()
159         self._add_route[guid].append((destination, netprefix, nexthop, 
160             metric, device)) 
161
162     def do_setup(self):
163         self._root_directory = self._attributes.\
164             get_attribute_value("rootDirectory")
165         self._status = TS.STATUS_SETUP
166
167     def do_create(self):
168         def set_params(self, guid):
169             parameters = self._get_parameters(guid)
170             for name, value in parameters.iteritems():
171                 self.set(guid, name, value)
172             
173         self._do_in_factory_order(
174             'create_function',
175             self._metadata.create_order,
176             postaction = set_params )
177         self._status = TS.STATUS_CREATED
178
179     def _do_connect(self, init = True):
180         unconnected = copy.deepcopy(self._connect)
181         
182         while unconnected:
183             for guid1, connections in unconnected.items():
184                 factory1 = self._get_factory(guid1)
185                 for connector_type_name1, connections2 in connections.items():
186                     connector_type1 = factory1.connector_type(connector_type_name1)
187                     for guid2, connector_type_name2 in connections2.items():
188                         factory_id2 = self._create[guid2]
189                         # Connections are executed in a "From -> To" direction only
190                         # This explicitly ignores the "To -> From" (mirror) 
191                         # connections of every connection pair.
192                         if init:
193                             connect_code = connector_type1.connect_to_init_code(
194                                     self._testbed_id, factory_id2, 
195                                     connector_type_name2,
196                                     False)
197                         else:
198                             connect_code = connector_type1.connect_to_compl_code(
199                                     self._testbed_id, factory_id2, 
200                                     connector_type_name2,
201                                     False)
202                         delay = None
203                         if connect_code:
204                             delay = connect_code(self, guid1, guid2)
205
206                         if delay is not CONNECTION_DELAY:
207                             del unconnected[guid1][connector_type_name1][guid2]
208                     if not unconnected[guid1][connector_type_name1]:
209                         del unconnected[guid1][connector_type_name1]
210                 if not unconnected[guid1]:
211                     del unconnected[guid1]
212
213     def do_connect_init(self):
214         self._do_connect()
215
216     def do_connect_compl(self):
217         self._do_connect(init = False)
218         self._status = TS.STATUS_CONNECTED
219
220     def _do_in_factory_order(self, action, order, postaction = None, poststep = None):
221         logger = self._logger
222         
223         guids = collections.defaultdict(list)
224         # order guids (elements) according to factory_id
225         for guid, factory_id in self._create.iteritems():
226             guids[factory_id].append(guid)
227         
228         # configure elements following the factory_id order
229         for factory_id in order:
230             # Create a parallel runner if we're given a Parallel() wrapper
231             runner = None
232             if isinstance(factory_id, Parallel):
233                 runner = ParallelRun(factory_id.maxthreads)
234                 factory_id = factory_id.factory
235             
236             # omit the factories that have no element to create
237             if factory_id not in guids:
238                 continue
239             
240             # configure action
241             factory = self._factories[factory_id]
242             if isinstance(action, basestring) and not getattr(factory, action):
243                 continue
244             def perform_action(guid):
245                 if isinstance(action, basestring):
246                     getattr(factory, action)(self, guid)
247                 else:
248                     action(self, guid)
249                 if postaction:
250                     postaction(self, guid)
251
252             # perform the action on all elements, in parallel if so requested
253             if runner:
254                 logger.debug("TestbedController: Starting parallel %s", action)
255                 runner.start()
256
257             for guid in guids[factory_id]:
258                 if runner:
259                     logger.debug("TestbedController: Scheduling %s on %s", action, guid)
260                     runner.put(perform_action, guid)
261                 else:
262                     logger.debug("TestbedController: Performing %s on %s", action, guid)
263                     perform_action(guid)
264
265             # sync
266             if runner:
267                 runner.sync()
268             
269             # post hook
270             if poststep:
271                 for guid in guids[factory_id]:
272                     if runner:
273                         logger.debug("TestbedController: Scheduling post-%s on %s", action, guid)
274                         runner.put(poststep, self, guid)
275                     else:
276                         logger.debug("TestbedController: Performing post-%s on %s", action, guid)
277                         poststep(self, guid)
278
279             # sync
280             if runner:
281                 runner.join()
282                 logger.debug("TestbedController: Finished parallel %s", action)
283
284     @staticmethod
285     def do_poststep_preconfigure(self, guid):
286         # dummy hook for implementations interested in
287         # two-phase configuration
288         pass
289
290     def do_preconfigure(self):
291         self._do_in_factory_order(
292             'preconfigure_function',
293             self._metadata.preconfigure_order,
294             poststep = self.do_poststep_preconfigure )
295
296     @staticmethod
297     def do_poststep_configure(self, guid):
298         # dummy hook for implementations interested in
299         # two-phase configuration
300         pass
301
302     def do_configure(self):
303         self._do_in_factory_order(
304             'configure_function',
305             self._metadata.configure_order,
306             poststep = self.do_poststep_configure )
307         self._status = TS.STATUS_CONFIGURED
308
309     def do_prestart(self):
310         self._do_in_factory_order(
311             'prestart_function',
312             self._metadata.prestart_order )
313
314     def _do_cross_connect(self, cross_data, init = True):
315         for guid, cross_connections in self._cross_connect.iteritems():
316             factory = self._get_factory(guid)
317             for connector_type_name, cross_connection in \
318                     cross_connections.iteritems():
319                 connector_type = factory.connector_type(connector_type_name)
320                 (cross_guid, cross_testbed_guid, cross_testbed_id,
321                     cross_factory_id, cross_connector_type_name) = cross_connection
322                 if init:
323                     connect_code = connector_type.connect_to_init_code(
324                         cross_testbed_id, cross_factory_id, 
325                         cross_connector_type_name,
326                         True)
327                 else:
328                     connect_code = connector_type.connect_to_compl_code(
329                         cross_testbed_id, cross_factory_id, 
330                         cross_connector_type_name,
331                         True)
332                 if connect_code:
333                     if hasattr(connect_code, "func"):
334                         func_name = connect_code.func.__name__
335                     elif hasattr(connect_code, "__name__"):
336                         func_name = connect_code.__name__
337                     else:
338                         func_name = repr(connect_code)
339                     self._logger.debug("Cross-connect - guid: %d, connect_code: %s " % (
340                         guid, func_name))
341                     elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
342                     connect_code(self, guid, elem_cross_data)       
343
344     def do_cross_connect_init(self, cross_data):
345         self._do_cross_connect(cross_data)
346
347     def do_cross_connect_compl(self, cross_data):
348         self._do_cross_connect(cross_data, init = False)
349         self._status = TS.STATUS_CROSS_CONNECTED
350
351     def set(self, guid, name, value, time = TIME_NOW):
352         self._validate_guid(guid)
353         self._validate_box_attribute(guid, name)
354         self._validate_box_value(guid, name, value)
355         self._validate_modify_box_value(guid, name)
356         if guid not in self._set:
357             self._set[guid] = dict()
358             self._setlog[guid] = dict()
359         if time not in self._setlog[guid]:
360             self._setlog[guid][time] = dict()
361         self._setlog[guid][time][name] = value
362         self._set[guid][name] = value
363
364     def get(self, guid, name, time = TIME_NOW):
365         """
366         gets an attribute from box definitions if available. 
367         Throws KeyError if the GUID wasn't created
368         through the defer_create interface, and AttributeError if the
369         attribute isn't available (doesn't exist or is design-only)
370         """
371         self._validate_guid(guid)
372         self._validate_box_attribute(guid, name)
373         if guid in self._set and name in self._set[guid]:
374             return self._set[guid][name]
375         if guid in self._create_set and name in self._create_set[guid]:
376             return self._create_set[guid][name]
377         # if nothing else found, returns the factory default value
378         factory = self._get_factory(guid)
379         return factory.box_attributes.get_attribute_value(name)
380
381     def get_route(self, guid, index, attribute):
382         """
383         returns information given to defer_add_route.
384         
385         Raises AttributeError if an invalid attribute is requested
386             or if the indexed routing rule does not exist.
387         
388         Raises KeyError if the GUID has not been seen by
389             defer_add_route
390         """
391         ATTRIBUTES = ['Destination', 'NetPrefix', 'NextHop']
392         
393         if attribute not in ATTRIBUTES:
394             raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
395         
396         attribute_index = ATTRIBUTES.index(attribute)
397         
398         routes = self._add_route.get(guid)
399         if not routes:
400             raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
401        
402         index = int(index)
403         if not (0 <= index < len(addresses)):
404             raise AttributeError, "GUID %r at %s does not have a routing entry #%s" % (
405                 guid, self._testbed_id, index)
406         
407         return routes[index][attribute_index]
408
409     def get_address(self, guid, index, attribute='Address'):
410         """
411         returns information given to defer_add_address
412         
413         Raises AttributeError if an invalid attribute is requested
414             or if the indexed routing rule does not exist.
415         
416         Raises KeyError if the GUID has not been seen by
417             defer_add_address
418         """
419         ATTRIBUTES = ['Address', 'NetPrefix', 'Broadcast']
420         
421         if attribute not in ATTRIBUTES:
422             raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
423         
424         attribute_index = ATTRIBUTES.index(attribute)
425         
426         addresses = self._add_address.get(guid)
427         if not addresses:
428             raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
429         
430         index = int(index)
431         if not (0 <= index < len(addresses)):
432             raise AttributeError, "GUID %r at %s does not have an address #%s" % (
433                 guid, self._testbed_id, index)
434         
435         return addresses[index][attribute_index]
436
437     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
438         factory = self._get_factory(guid)
439         attribute_list = list()
440         return factory.box_attributes.get_attribute_list(filter_flags, exclude)
441
442     def get_factory_id(self, guid):
443         factory = self._get_factory(guid)
444         return factory.factory_id
445
446     def start(self, time = TIME_NOW):
447         self._do_in_factory_order(
448             'start_function',
449             self._metadata.start_order )
450         self._status = TS.STATUS_STARTED
451
452     #action: NotImplementedError
453
454     def stop(self, time = TIME_NOW):
455         self._do_in_factory_order(
456             'stop_function',
457             reversed(self._metadata.start_order) )
458         self._status = TS.STATUS_STOPPED
459
460     def status(self, guid = None):
461         if not guid:
462             return self._status
463         self._validate_guid(guid)
464         factory = self._get_factory(guid)
465         status_function = factory.status_function
466         if status_function:
467             return status_function(self, guid)
468         return AS.STATUS_UNDETERMINED
469     
470     def testbed_status(self):
471         return self._status
472
473     def trace(self, guid, trace_id, attribute='value'):
474         if attribute == 'value':
475             fd = open("%s" % self.trace_filepath(guid, trace_id), "r")
476             content = fd.read()
477             fd.close()
478         elif attribute == 'path':
479             content = self.trace_filepath(guid, trace_id)
480         elif attribute == 'filename':
481             content = self.trace_filename(guid, trace_id)
482         else:
483             content = None
484         return content
485
486     def traces_info(self):
487         traces_info = dict()
488         host = self._attributes.get_attribute_value("deployment_host")
489         user = self._attributes.get_attribute_value("deployment_user")
490         for guid, trace_list in self._add_trace.iteritems(): 
491             traces_info[guid] = dict()
492             for trace_id in trace_list:
493                 traces_info[guid][trace_id] = dict()
494                 filepath = self.trace(guid, trace_id, attribute = "path")
495                 traces_info[guid][trace_id]["host"] = host
496                 traces_info[guid][trace_id]["user"] = user
497                 traces_info[guid][trace_id]["filepath"] = filepath
498         return traces_info
499
500     def trace_filepath(self, guid, trace_id):
501         """
502         Return a trace's file path, for TestbedController's default 
503         implementation of trace()
504         """
505         raise NotImplementedError
506
507     def trace_filename(self, guid, trace_id):
508         """
509         Return a trace's file name, for TestbedController's default 
510         implementation of trace()
511         """
512         raise NotImplementedError
513
514     #shutdown: NotImplementedError
515
516     def get_connected(self, guid, connector_type_name, 
517             other_connector_type_name):
518         """searchs the connected elements for the specific connector_type_name 
519         pair"""
520         if guid not in self._connect:
521             return []
522         # all connections for all connectors for guid
523         all_connections = self._connect[guid]
524         if connector_type_name not in all_connections:
525             return []
526         # all connections for the specific connector
527         connections = all_connections[connector_type_name]
528         specific_connections = [otr_guid for otr_guid, otr_connector_type_name \
529                 in connections.iteritems() if \
530                 otr_connector_type_name == other_connector_type_name]
531         return specific_connections
532
533     def _get_connection_count(self, guid, connection_type_name):
534         count = 0
535         cross_count = 0
536         if guid in self._connect and connection_type_name in \
537                 self._connect[guid]:
538             count = len(self._connect[guid][connection_type_name])
539         if guid in self._cross_connect and connection_type_name in \
540                 self._cross_connect[guid]:
541             cross_count = len(self._cross_connect[guid][connection_type_name])
542         return count + cross_count
543
544     def _get_traces(self, guid):
545         return [] if guid not in self._add_trace else self._add_trace[guid]
546
547     def _get_parameters(self, guid):
548         return dict() if guid not in self._create_set else \
549                 self._create_set[guid]
550
551     def _get_factory(self, guid):
552         factory_id = self._create[guid]
553         return self._factories[factory_id]
554
555     def _get_factory_id(self, guid):
556         """ Returns the factory ID of the (perhaps not yet) created object """
557         return self._create.get(guid, None)
558
559     def _validate_guid(self, guid):
560         if not guid in self._create:
561             raise RuntimeError("Element guid %d doesn't exist" % guid)
562
563     def _validate_not_guid(self, guid):
564         if guid in self._create:
565             raise AttributeError("Cannot add elements with the same guid: %d" %
566                     guid)
567
568     def _validate_factory_id(self, factory_id):
569         if factory_id not in self._factories:
570             raise AttributeError("Invalid element type %s for testbed version %s" %
571                     (factory_id, self._testbed_version))
572
573     def _validate_testbed_attribute(self, name):
574         if not self._attributes.has_attribute(name):
575             raise AttributeError("Invalid testbed attribute %s for testbed" % \
576                     name)
577
578     def _validate_testbed_value(self, name, value):
579         if not self._attributes.is_attribute_value_valid(name, value):
580             raise AttributeError("Invalid value %r for testbed attribute %s" % \
581                 (value, name))
582
583     def _validate_box_attribute(self, guid, name):
584         factory = self._get_factory(guid)
585         if not factory.box_attributes.has_attribute(name):
586             raise AttributeError("Invalid attribute %s for element type %s" %
587                     (name, factory.factory_id))
588
589     def _validate_box_value(self, guid, name, value):
590         factory = self._get_factory(guid)
591         if not factory.box_attributes.is_attribute_value_valid(name, value):
592             raise AttributeError("Invalid value %r for attribute %s" % \
593                 (value, name))
594
595     def _validate_factory_attribute(self, guid, name):
596         factory = self._get_factory(guid)
597         if not factory.has_attribute(name):
598             raise AttributeError("Invalid attribute %s for element type %s" %
599                     (name, factory.factory_id))
600
601     def _validate_factory_value(self, guid, name, value):
602         factory = self._get_factory(guid)
603         if not factory.is_attribute_value_valid(name, value):
604             raise AttributeError("Invalid value %r for attribute %s" % \
605                 (value, name))
606
607     def _validate_trace(self, guid, trace_name):
608         factory = self._get_factory(guid)
609         if not trace_name in factory.traces_list:
610             raise RuntimeError("Element type '%s' has no trace '%s'" %
611                     (factory.factory_id, trace_name))
612
613     def _validate_allow_addresses(self, guid):
614         factory = self._get_factory(guid)
615         if not factory.allow_addresses:
616             raise RuntimeError("Element type '%s' doesn't support addresses" %
617                     factory.factory_id)
618         attr_name = "maxAddresses"
619         if guid in self._create_set and attr_name in self._create_set[guid]:
620             max_addresses = self._create_set[guid][attr_name]
621         else:
622             factory = self._get_factory(guid)
623             max_addresses = factory.box_attributes.get_attribute_value(attr_name)
624         if guid in self._add_address:
625             count_addresses = len(self._add_address[guid])
626             if max_addresses == count_addresses:
627                 raise RuntimeError("Element guid %d of type '%s' can't accept \
628                         more addresses" % (guid, factory.factory_id))
629
630     def _validate_allow_routes(self, guid):
631         factory = self._get_factory(guid)
632         if not factory.allow_routes:
633             raise RuntimeError("Element type '%s' doesn't support routes" %
634                     factory.factory_id)
635
636     def _validate_connection(self, guid1, connector_type_name1, guid2, 
637             connector_type_name2, cross = False):
638         # can't connect with self
639         if guid1 == guid2:
640             raise AttributeError("Can't connect guid %d to self" % \
641                 (guid1))
642         # the connection is already done, so ignore
643         connected = self.get_connected(guid1, connector_type_name1, 
644                 connector_type_name2)
645         if guid2 in connected:
646             return
647         count1 = self._get_connection_count(guid1, connector_type_name1)
648         factory1 = self._get_factory(guid1)
649         connector_type1 = factory1.connector_type(connector_type_name1)
650         if count1 == connector_type1.max:
651             raise AttributeError("Connector %s is full for guid %d" % \
652                 (connector_type_name1, guid1))
653
654     def _validate_modify_box_value(self, guid, name):
655         factory = self._get_factory(guid)
656         if self._status > TS.STATUS_STARTED and \
657                 (factory.box_attributes.is_attribute_exec_read_only(name) or \
658                 factory.box_attributes.is_attribute_exec_immutable(name)):
659             raise AttributeError("Attribute %s can only be modified during experiment design" % name)
660