4ee31107ec08636d463f0944c347559840c0366a
[nepi.git] / src / nepi / core / testbed_impl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from nepi.core import execute
5 from nepi.core.metadata import Metadata, Parallel
6 from nepi.util import validation
7 from nepi.util.constants import TIME_NOW, \
8         ApplicationStatus as AS, \
9         TestbedStatus as TS, \
10         CONNECTION_DELAY
11 from nepi.util.parallel import ParallelRun
12
13 import collections
14 import copy
15 import logging
16
17 class TestbedController(execute.TestbedController):
18     def __init__(self, testbed_id, testbed_version):
19         super(TestbedController, self).__init__(testbed_id, testbed_version)
20         self._status = TS.STATUS_ZERO
21         # testbed attributes for validation
22         self._attributes = None
23         # element factories for validation
24         self._factories = dict()
25
26         # experiment construction instructions
27         self._create = dict()
28         self._create_set = dict()
29         self._factory_set = dict()
30         self._connect = dict()
31         self._cross_connect = dict()
32         self._add_trace = dict()
33         self._add_address = dict()
34         self._add_route = dict()
35         self._configure = dict()
36
37         # log of set operations
38         self._setlog = dict()
39         # last set operations
40         self._set = dict()
41
42         # testbed element instances
43         self._elements = dict()
44
45         self._metadata = Metadata(self._testbed_id)
46         if self._metadata.testbed_version != testbed_version:
47             raise RuntimeError("Bad testbed version on testbed %s. Asked for %s, got %s" % \
48                     (testbed_id, testbed_version, self._metadata.testbed_version))
49         for factory in self._metadata.build_factories():
50             self._factories[factory.factory_id] = factory
51         self._attributes = self._metadata.testbed_attributes()
52         self._root_directory = None
53         
54         # Logging
55         self._logger = logging.getLogger("nepi.core.testbed_impl")
56     
57     @property
58     def root_directory(self):
59         return self._root_directory
60
61     @property
62     def guids(self):
63         return self._create.keys()
64
65     @property
66     def elements(self):
67         return self._elements
68     
69     def defer_configure(self, name, value):
70         self._validate_testbed_attribute(name)
71         self._validate_testbed_value(name, value)
72         self._attributes.set_attribute_value(name, value)
73         self._configure[name] = value
74
75     def defer_create(self, guid, factory_id):
76         self._validate_factory_id(factory_id)
77         self._validate_not_guid(guid)
78         self._create[guid] = factory_id
79
80     def defer_create_set(self, guid, name, value):
81         self._validate_guid(guid)
82         self._validate_box_attribute(guid, name)
83         self._validate_box_value(guid, name, value)
84         if guid not in self._create_set:
85             self._create_set[guid] = dict()
86         self._create_set[guid][name] = value
87
88     def defer_factory_set(self, guid, name, value):
89         self._validate_guid(guid)
90         self._validate_factory_attribute(guid, name)
91         self._validate_factory_value(guid, name, value)
92         if guid not in self._factory_set:
93             self._factory_set[guid] = dict()
94         self._factory_set[guid][name] = value
95
96     def defer_connect(self, guid1, connector_type_name1, guid2, 
97             connector_type_name2):
98         self._validate_guid(guid1)
99         self._validate_guid(guid2)
100         factory1 = self._get_factory(guid1)
101         factory_id2 = self._create[guid2]
102         connector_type = factory1.connector_type(connector_type_name1)
103         connector_type.can_connect(self._testbed_id, factory_id2, 
104                 connector_type_name2, False)
105         self._validate_connection(guid1, connector_type_name1, guid2, 
106             connector_type_name2)
107
108         if not guid1 in self._connect:
109             self._connect[guid1] = dict()
110         if not connector_type_name1 in self._connect[guid1]:
111              self._connect[guid1][connector_type_name1] = dict()
112         self._connect[guid1][connector_type_name1][guid2] = \
113                connector_type_name2
114         if not guid2 in self._connect:
115             self._connect[guid2] = dict()
116         if not connector_type_name2 in self._connect[guid2]:
117              self._connect[guid2][connector_type_name2] = dict()
118         self._connect[guid2][connector_type_name2][guid1] = \
119                connector_type_name1
120
121     def defer_cross_connect(self, guid, connector_type_name, cross_guid, 
122             cross_testbed_guid, cross_testbed_id, cross_factory_id, 
123             cross_connector_type_name):
124         self._validate_guid(guid)
125         factory = self._get_factory(guid)
126         connector_type = factory.connector_type(connector_type_name)
127         connector_type.can_connect(cross_testbed_id, cross_factory_id, 
128                 cross_connector_type_name, True)
129         self._validate_connection(guid, connector_type_name, cross_guid, 
130             cross_connector_type_name)
131
132         if not guid in self._cross_connect:
133             self._cross_connect[guid] = dict()
134         if not connector_type_name in self._cross_connect[guid]:
135              self._cross_connect[guid][connector_type_name] = dict()
136         self._cross_connect[guid][connector_type_name] = \
137                 (cross_guid, cross_testbed_guid, cross_testbed_id, 
138                 cross_factory_id, cross_connector_type_name)
139
140     def defer_add_trace(self, guid, trace_name):
141         self._validate_guid(guid)
142         self._validate_trace(guid, trace_name)
143         if not guid in self._add_trace:
144             self._add_trace[guid] = list()
145         self._add_trace[guid].append(trace_name)
146
147     def defer_add_address(self, guid, address, netprefix, broadcast):
148         self._validate_guid(guid)
149         self._validate_allow_addresses(guid)
150         if guid not in self._add_address:
151             self._add_address[guid] = list()
152         self._add_address[guid].append((address, netprefix, broadcast))
153
154     def defer_add_route(self, guid, destination, netprefix, nexthop, metric = 0):
155         self._validate_guid(guid)
156         self._validate_allow_routes(guid)
157         if not guid in self._add_route:
158             self._add_route[guid] = list()
159         self._add_route[guid].append((destination, netprefix, nexthop, metric)) 
160
161     def do_setup(self):
162         self._root_directory = self._attributes.\
163             get_attribute_value("rootDirectory")
164         self._status = TS.STATUS_SETUP
165
166     def do_create(self):
167         def set_params(self, guid):
168             parameters = self._get_parameters(guid)
169             for name, value in parameters.iteritems():
170                 self.set(guid, name, value)
171             
172         self._do_in_factory_order(
173             'create_function',
174             self._metadata.create_order,
175             postaction = set_params )
176         self._status = TS.STATUS_CREATED
177
178     def _do_connect(self, init = True):
179         unconnected = copy.deepcopy(self._connect)
180         
181         while unconnected:
182             for guid1, connections in unconnected.items():
183                 factory1 = self._get_factory(guid1)
184                 for connector_type_name1, connections2 in connections.items():
185                     connector_type1 = factory1.connector_type(connector_type_name1)
186                     for guid2, connector_type_name2 in connections2.items():
187                         factory_id2 = self._create[guid2]
188                         # Connections are executed in a "From -> To" direction only
189                         # This explicitly ignores the "To -> From" (mirror) 
190                         # connections of every connection pair.
191                         if init:
192                             connect_code = connector_type1.connect_to_init_code(
193                                     self._testbed_id, factory_id2, 
194                                     connector_type_name2,
195                                     False)
196                         else:
197                             connect_code = connector_type1.connect_to_compl_code(
198                                     self._testbed_id, factory_id2, 
199                                     connector_type_name2,
200                                     False)
201                         delay = None
202                         if connect_code:
203                             delay = connect_code(self, guid1, guid2)
204
205                         if delay is not CONNECTION_DELAY:
206                             del unconnected[guid1][connector_type_name1][guid2]
207                     if not unconnected[guid1][connector_type_name1]:
208                         del unconnected[guid1][connector_type_name1]
209                 if not unconnected[guid1]:
210                     del unconnected[guid1]
211
212     def do_connect_init(self):
213         self._do_connect()
214
215     def do_connect_compl(self):
216         self._do_connect(init = False)
217         self._status = TS.STATUS_CONNECTED
218
219     def _do_in_factory_order(self, action, order, postaction = None, poststep = None):
220         logger = self._logger
221         
222         guids = collections.defaultdict(list)
223         # order guids (elements) according to factory_id
224         for guid, factory_id in self._create.iteritems():
225             guids[factory_id].append(guid)
226         
227         # configure elements following the factory_id order
228         for factory_id in order:
229             # Create a parallel runner if we're given a Parallel() wrapper
230             runner = None
231             if isinstance(factory_id, Parallel):
232                 runner = ParallelRun(factory_id.maxthreads)
233                 factory_id = factory_id.factory
234             
235             # omit the factories that have no element to create
236             if factory_id not in guids:
237                 continue
238             
239             # configure action
240             factory = self._factories[factory_id]
241             if isinstance(action, basestring) and not getattr(factory, action):
242                 continue
243             def perform_action(guid):
244                 if isinstance(action, basestring):
245                     getattr(factory, action)(self, guid)
246                 else:
247                     action(self, guid)
248                 if postaction:
249                     postaction(self, guid)
250
251             # perform the action on all elements, in parallel if so requested
252             if runner:
253                 logger.debug("TestbedController: Starting parallel %s", action)
254                 runner.start()
255
256             for guid in guids[factory_id]:
257                 if runner:
258                     logger.debug("TestbedController: Scheduling %s on %s", action, guid)
259                     runner.put(perform_action, guid)
260                 else:
261                     logger.debug("TestbedController: Performing %s on %s", action, guid)
262                     perform_action(guid)
263
264             # sync
265             if runner:
266                 runner.sync()
267             
268             # post hook
269             if poststep:
270                 for guid in guids[factory_id]:
271                     if runner:
272                         logger.debug("TestbedController: Scheduling post-%s on %s", action, guid)
273                         runner.put(poststep, self, guid)
274                     else:
275                         logger.debug("TestbedController: Performing post-%s on %s", action, guid)
276                         poststep(self, guid)
277
278             # sync
279             if runner:
280                 runner.join()
281                 logger.debug("TestbedController: Finished parallel %s", action)
282
283     @staticmethod
284     def do_poststep_preconfigure(self, guid):
285         # dummy hook for implementations interested in
286         # two-phase configuration
287         pass
288
289     def do_preconfigure(self):
290         self._do_in_factory_order(
291             'preconfigure_function',
292             self._metadata.preconfigure_order,
293             poststep = self.do_poststep_preconfigure )
294
295     @staticmethod
296     def do_poststep_configure(self, guid):
297         # dummy hook for implementations interested in
298         # two-phase configuration
299         pass
300
301     def do_configure(self):
302         self._do_in_factory_order(
303             'configure_function',
304             self._metadata.configure_order,
305             poststep = self.do_poststep_configure )
306         self._status = TS.STATUS_CONFIGURED
307
308     def do_prestart(self):
309         self._do_in_factory_order(
310             'prestart_function',
311             self._metadata.prestart_order )
312
313     def _do_cross_connect(self, cross_data, init = True):
314         for guid, cross_connections in self._cross_connect.iteritems():
315             factory = self._get_factory(guid)
316             for connector_type_name, cross_connection in \
317                     cross_connections.iteritems():
318                 connector_type = factory.connector_type(connector_type_name)
319                 (cross_guid, cross_testbed_guid, cross_testbed_id,
320                     cross_factory_id, cross_connector_type_name) = cross_connection
321                 if init:
322                     connect_code = connector_type.connect_to_init_code(
323                         cross_testbed_id, cross_factory_id, 
324                         cross_connector_type_name,
325                         True)
326                 else:
327                     connect_code = connector_type.connect_to_compl_code(
328                         cross_testbed_id, cross_factory_id, 
329                         cross_connector_type_name,
330                         True)
331                 if connect_code:
332                     if hasattr(connect_code, "func"):
333                         func_name = connect_code.func.__name__
334                     elif hasattr(connect_code, "__name__"):
335                         func_name = connect_code.__name__
336                     else:
337                         func_name = repr(connect_code)
338                     self._logger.debug("Cross-connect - guid: %d, connect_code: %s " % (
339                         guid, func_name))
340                     elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
341                     connect_code(self, guid, elem_cross_data)       
342
343     def do_cross_connect_init(self, cross_data):
344         self._do_cross_connect(cross_data)
345
346     def do_cross_connect_compl(self, cross_data):
347         self._do_cross_connect(cross_data, init = False)
348         self._status = TS.STATUS_CROSS_CONNECTED
349
350     def set(self, guid, name, value, time = TIME_NOW):
351         self._validate_guid(guid)
352         self._validate_box_attribute(guid, name)
353         self._validate_box_value(guid, name, value)
354         self._validate_modify_box_value(guid, name)
355         if guid not in self._set:
356             self._set[guid] = dict()
357             self._setlog[guid] = dict()
358         if time not in self._setlog[guid]:
359             self._setlog[guid][time] = dict()
360         self._setlog[guid][time][name] = value
361         self._set[guid][name] = value
362
363     def get(self, guid, name, time = TIME_NOW):
364         """
365         gets an attribute from box definitions if available. 
366         Throws KeyError if the GUID wasn't created
367         through the defer_create interface, and AttributeError if the
368         attribute isn't available (doesn't exist or is design-only)
369         """
370         self._validate_guid(guid)
371         self._validate_box_attribute(guid, name)
372         if guid in self._set and name in self._set[guid]:
373             return self._set[guid][name]
374         if guid in self._create_set and name in self._create_set[guid]:
375             return self._create_set[guid][name]
376         # if nothing else found, returns the factory default value
377         factory = self._get_factory(guid)
378         return factory.box_attributes.get_attribute_value(name)
379
380     def get_route(self, guid, index, attribute):
381         """
382         returns information given to defer_add_route.
383         
384         Raises AttributeError if an invalid attribute is requested
385             or if the indexed routing rule does not exist.
386         
387         Raises KeyError if the GUID has not been seen by
388             defer_add_route
389         """
390         ATTRIBUTES = ['Destination', 'NetPrefix', 'NextHop']
391         
392         if attribute not in ATTRIBUTES:
393             raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
394         
395         attribute_index = ATTRIBUTES.index(attribute)
396         
397         routes = self._add_route.get(guid)
398         if not routes:
399             raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
400        
401         index = int(index)
402         if not (0 <= index < len(addresses)):
403             raise AttributeError, "GUID %r at %s does not have a routing entry #%s" % (
404                 guid, self._testbed_id, index)
405         
406         return routes[index][attribute_index]
407
408     def get_address(self, guid, index, attribute='Address'):
409         """
410         returns information given to defer_add_address
411         
412         Raises AttributeError if an invalid attribute is requested
413             or if the indexed routing rule does not exist.
414         
415         Raises KeyError if the GUID has not been seen by
416             defer_add_address
417         """
418         ATTRIBUTES = ['Address', 'NetPrefix', 'Broadcast']
419         
420         if attribute not in ATTRIBUTES:
421             raise AttributeError, "Attribute %r invalid for addresses of %r" % (attribute, guid)
422         
423         attribute_index = ATTRIBUTES.index(attribute)
424         
425         addresses = self._add_address.get(guid)
426         if not addresses:
427             raise KeyError, "GUID %r not found in %s" % (guid, self._testbed_id)
428         
429         index = int(index)
430         if not (0 <= index < len(addresses)):
431             raise AttributeError, "GUID %r at %s does not have an address #%s" % (
432                 guid, self._testbed_id, index)
433         
434         return addresses[index][attribute_index]
435
436     def get_attribute_list(self, guid, filter_flags = None, exclude = False):
437         factory = self._get_factory(guid)
438         attribute_list = list()
439         return factory.box_attributes.get_attribute_list(filter_flags, exclude)
440
441     def get_factory_id(self, guid):
442         factory = self._get_factory(guid)
443         return factory.factory_id
444
445     def start(self, time = TIME_NOW):
446         self._do_in_factory_order(
447             'start_function',
448             self._metadata.start_order )
449         self._status = TS.STATUS_STARTED
450
451     #action: NotImplementedError
452
453     def stop(self, time = TIME_NOW):
454         self._do_in_factory_order(
455             'stop_function',
456             reversed(self._metadata.start_order) )
457         self._status = TS.STATUS_STOPPED
458
459     def status(self, guid = None):
460         if not guid:
461             return self._status
462         self._validate_guid(guid)
463         factory = self._get_factory(guid)
464         status_function = factory.status_function
465         if status_function:
466             return status_function(self, guid)
467         return AS.STATUS_UNDETERMINED
468     
469     def testbed_status(self):
470         return self._status
471
472     def trace(self, guid, trace_id, attribute='value'):
473         if attribute == 'value':
474             fd = open("%s" % self.trace_filepath(guid, trace_id), "r")
475             content = fd.read()
476             fd.close()
477         elif attribute == 'path':
478             content = self.trace_filepath(guid, trace_id)
479         elif attribute == 'filename':
480             content = self.trace_filename(guid, trace_id)
481         else:
482             content = None
483         return content
484
485     def traces_info(self):
486         traces_info = dict()
487         host = self._attributes.get_attribute_value("deployment_host")
488         user = self._attributes.get_attribute_value("deployment_user")
489         for guid, trace_list in self._add_trace.iteritems(): 
490             traces_info[guid] = dict()
491             for trace_id in trace_list:
492                 traces_info[guid][trace_id] = dict()
493                 filepath = self.trace(guid, trace_id, attribute = "path")
494                 traces_info[guid][trace_id]["host"] = host
495                 traces_info[guid][trace_id]["user"] = user
496                 traces_info[guid][trace_id]["filepath"] = filepath
497         return traces_info
498
499     def trace_filepath(self, guid, trace_id):
500         """
501         Return a trace's file path, for TestbedController's default 
502         implementation of trace()
503         """
504         raise NotImplementedError
505
506     def trace_filename(self, guid, trace_id):
507         """
508         Return a trace's file name, for TestbedController's default 
509         implementation of trace()
510         """
511         raise NotImplementedError
512
513     #shutdown: NotImplementedError
514
515     def get_connected(self, guid, connector_type_name, 
516             other_connector_type_name):
517         """searchs the connected elements for the specific connector_type_name 
518         pair"""
519         if guid not in self._connect:
520             return []
521         # all connections for all connectors for guid
522         all_connections = self._connect[guid]
523         if connector_type_name not in all_connections:
524             return []
525         # all connections for the specific connector
526         connections = all_connections[connector_type_name]
527         specific_connections = [otr_guid for otr_guid, otr_connector_type_name \
528                 in connections.iteritems() if \
529                 otr_connector_type_name == other_connector_type_name]
530         return specific_connections
531
532     def _get_connection_count(self, guid, connection_type_name):
533         count = 0
534         cross_count = 0
535         if guid in self._connect and connection_type_name in \
536                 self._connect[guid]:
537             count = len(self._connect[guid][connection_type_name])
538         if guid in self._cross_connect and connection_type_name in \
539                 self._cross_connect[guid]:
540             cross_count = len(self._cross_connect[guid][connection_type_name])
541         return count + cross_count
542
543     def _get_traces(self, guid):
544         return [] if guid not in self._add_trace else self._add_trace[guid]
545
546     def _get_parameters(self, guid):
547         return dict() if guid not in self._create_set else \
548                 self._create_set[guid]
549
550     def _get_factory(self, guid):
551         factory_id = self._create[guid]
552         return self._factories[factory_id]
553
554     def _get_factory_id(self, guid):
555         """ Returns the factory ID of the (perhaps not yet) created object """
556         return self._create.get(guid, None)
557
558     def _validate_guid(self, guid):
559         if not guid in self._create:
560             raise RuntimeError("Element guid %d doesn't exist" % guid)
561
562     def _validate_not_guid(self, guid):
563         if guid in self._create:
564             raise AttributeError("Cannot add elements with the same guid: %d" %
565                     guid)
566
567     def _validate_factory_id(self, factory_id):
568         if factory_id not in self._factories:
569             raise AttributeError("Invalid element type %s for testbed version %s" %
570                     (factory_id, self._testbed_version))
571
572     def _validate_testbed_attribute(self, name):
573         if not self._attributes.has_attribute(name):
574             raise AttributeError("Invalid testbed attribute %s for testbed" % \
575                     name)
576
577     def _validate_testbed_value(self, name, value):
578         if not self._attributes.is_attribute_value_valid(name, value):
579             raise AttributeError("Invalid value %r for testbed attribute %s" % \
580                 (value, name))
581
582     def _validate_box_attribute(self, guid, name):
583         factory = self._get_factory(guid)
584         if not factory.box_attributes.has_attribute(name):
585             raise AttributeError("Invalid attribute %s for element type %s" %
586                     (name, factory.factory_id))
587
588     def _validate_box_value(self, guid, name, value):
589         factory = self._get_factory(guid)
590         if not factory.box_attributes.is_attribute_value_valid(name, value):
591             raise AttributeError("Invalid value %r for attribute %s" % \
592                 (value, name))
593
594     def _validate_factory_attribute(self, guid, name):
595         factory = self._get_factory(guid)
596         if not factory.has_attribute(name):
597             raise AttributeError("Invalid attribute %s for element type %s" %
598                     (name, factory.factory_id))
599
600     def _validate_factory_value(self, guid, name, value):
601         factory = self._get_factory(guid)
602         if not factory.is_attribute_value_valid(name, value):
603             raise AttributeError("Invalid value %r for attribute %s" % \
604                 (value, name))
605
606     def _validate_trace(self, guid, trace_name):
607         factory = self._get_factory(guid)
608         if not trace_name in factory.traces_list:
609             raise RuntimeError("Element type '%s' has no trace '%s'" %
610                     (factory.factory_id, trace_name))
611
612     def _validate_allow_addresses(self, guid):
613         factory = self._get_factory(guid)
614         if not factory.allow_addresses:
615             raise RuntimeError("Element type '%s' doesn't support addresses" %
616                     factory.factory_id)
617         attr_name = "maxAddresses"
618         if guid in self._create_set and attr_name in self._create_set[guid]:
619             max_addresses = self._create_set[guid][attr_name]
620         else:
621             factory = self._get_factory(guid)
622             max_addresses = factory.box_attributes.get_attribute_value(attr_name)
623         if guid in self._add_address:
624             count_addresses = len(self._add_address[guid])
625             if max_addresses == count_addresses:
626                 raise RuntimeError("Element guid %d of type '%s' can't accept \
627                         more addresses" % (guid, factory.factory_id))
628
629     def _validate_allow_routes(self, guid):
630         factory = self._get_factory(guid)
631         if not factory.allow_routes:
632             raise RuntimeError("Element type '%s' doesn't support routes" %
633                     factory.factory_id)
634
635     def _validate_connection(self, guid1, connector_type_name1, guid2, 
636             connector_type_name2, cross = False):
637         # can't connect with self
638         if guid1 == guid2:
639             raise AttributeError("Can't connect guid %d to self" % \
640                 (guid1))
641         # the connection is already done, so ignore
642         connected = self.get_connected(guid1, connector_type_name1, 
643                 connector_type_name2)
644         if guid2 in connected:
645             return
646         count1 = self._get_connection_count(guid1, connector_type_name1)
647         factory1 = self._get_factory(guid1)
648         connector_type1 = factory1.connector_type(connector_type_name1)
649         if count1 == connector_type1.max:
650             raise AttributeError("Connector %s is full for guid %d" % \
651                 (connector_type_name1, guid1))
652
653     def _validate_modify_box_value(self, guid, name):
654         factory = self._get_factory(guid)
655         if self._status > TS.STATUS_STARTED and \
656                 (factory.box_attributes.is_attribute_exec_read_only(name) or \
657                 factory.box_attributes.is_attribute_exec_immutable(name)):
658             raise AttributeError("Attribute %s can only be modified during experiment design" % name)
659