9b7e5e870a5b206ed7e64216f82bf0ce98a22310
[nepi.git] / test / execution / resource.py
1 #!/usr/bin/env python
2 #
3 #    NEPI, a framework to manage network experiments
4 #    Copyright (C) 2013 INRIA
5 #
6 #    This program is free software: you can redistribute it and/or modify
7 #    it under the terms of the GNU General Public License as published by
8 #    the Free Software Foundation, either version 3 of the License, or
9 #    (at your option) any later version.
10 #
11 #    This program is distributed in the hope that it will be useful,
12 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
13 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 #    GNU General Public License for more details.
15 #
16 #    You should have received a copy of the GNU General Public License
17 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20
21
22 from nepi.execution.attribute import Attribute
23 from nepi.execution.ec import ExperimentController, FailureLevel 
24 from nepi.execution.resource import ResourceManager, ResourceState, \
25         clsinit_copy, ResourceAction
26
27 import random
28 import time
29 import unittest
30
31 @clsinit_copy
32 class MyResource(ResourceManager):
33     _rtype = "MyResource"
34
35     @classmethod
36     def _register_attributes(cls):
37         cool_attr = Attribute("my_attr", "is a really nice attribute!")
38         cls._register_attribute(cool_attr)
39
40     def __init__(self, ec, guid):
41         super(MyResource, self).__init__(ec, guid)
42
43 @clsinit_copy
44 class AnotherResource(ResourceManager):
45     _rtype = "AnotherResource"
46
47     def __init__(self, ec, guid):
48         super(AnotherResource, self).__init__(ec, guid)
49
50 class Channel(ResourceManager):
51     _rtype = "Channel"
52
53     def __init__(self, ec, guid):
54         super(Channel, self).__init__(ec, guid)
55
56     def do_deploy(self):
57         time.sleep(1)
58         super(Channel, self).do_deploy()
59         self.logger.debug(" -------- DEPLOYED ------- ")
60        
61 class Interface(ResourceManager):
62     _rtype = "Interface"
63
64     def __init__(self, ec, guid):
65         super(Interface, self).__init__(ec, guid)
66
67     def do_deploy(self):
68         node = self.get_connected(Node.get_rtype())[0]
69         chan = self.get_connected(Channel.get_rtype())[0]
70
71         if node.state < ResourceState.PROVISIONED:
72             self.ec.schedule("0.5s", self.deploy)
73         elif chan.state < ResourceState.READY:
74             self.ec.schedule("0.5s", self.deploy)
75         else:
76             time.sleep(2)
77             super(Interface, self).do_deploy()
78             self.logger.debug(" -------- DEPLOYED ------- ")
79
80 class Node(ResourceManager):
81     _rtype = "Node"
82
83     def __init__(self, ec, guid):
84         super(Node, self).__init__(ec, guid)
85
86     def do_deploy(self):
87         if self.state == ResourceState.NEW:
88             self.do_discover()
89             self.do_provision()
90             self.logger.debug(" -------- PROVISIONED ------- ")
91             self.ec.schedule("1s", self.deploy)
92         elif self.state == ResourceState.PROVISIONED:
93             ifaces = self.get_connected(Interface.get_rtype())
94             for rm in ifaces:
95                 if rm.state < ResourceState.READY:
96                     self.ec.schedule("0.5s", self.deploy)
97                     return 
98
99             super(Node, self).do_deploy()
100             self.logger.debug(" -------- DEPLOYED ------- ")
101
102 class Application(ResourceManager):
103     _rtype = "Application"
104
105     def __init__(self, ec, guid):
106         super(Application, self).__init__(ec, guid)
107
108     def do_deploy(self):
109         node = self.get_connected(Node.get_rtype())[0]
110         if node.state < ResourceState.READY:
111             self.ec.schedule("0.5s", self.deploy)
112         else:
113             time.sleep(random.random() * 2)
114             super(Application, self).do_deploy()
115             self.logger.debug(" -------- DEPLOYED ------- ")
116
117     def do_start(self):
118         super(Application, self).do_start()
119         time.sleep(random.random() * 3)
120         self.ec.schedule("0.5s", self.stop)
121
122 class ErrorApplication(ResourceManager):
123     _rtype = "ErrorApplication"
124
125     def __init__(self, ec, guid):
126         super(ErrorApplication, self).__init__(ec, guid)
127
128     def do_deploy(self):
129         node = self.get_connected(Node.get_rtype())[0]
130         if node.state < ResourceState.READY:
131             self.ec.schedule("0.5s", self.deploy)
132         else:
133             time.sleep(random.random() * 2)
134             raise RuntimeError, "NOT A REAL ERROR. JUST TESTING"
135
136 class ResourceFactoryTestCase(unittest.TestCase):
137     def test_add_resource_factory(self):
138         from nepi.execution.resource import ResourceFactory
139
140         ResourceFactory._resource_types = dict()
141         ResourceFactory.register_type(MyResource)
142         ResourceFactory.register_type(AnotherResource)
143
144         # Take into account default 'Critical' attribute
145         self.assertEquals(MyResource.get_rtype(), "MyResource")
146         self.assertEquals(len(MyResource._attributes), 3)
147
148         self.assertEquals(ResourceManager.get_rtype(), "Resource")
149         self.assertEquals(len(ResourceManager._attributes), 2)
150
151         self.assertEquals(AnotherResource.get_rtype(), "AnotherResource")
152         self.assertEquals(len(AnotherResource._attributes), 2)
153
154         self.assertEquals(len(ResourceFactory.resource_types()), 2)
155         
156         # restore factory state for other tests
157         from nepi.execution.resource import populate_factory
158         ResourceFactory._resource_types = dict()
159         populate_factory()
160
161 class ResourceManagerTestCase(unittest.TestCase):
162     def test_register_condition(self):
163         ec = ExperimentController()
164         rm = ResourceManager(ec, 15)
165
166         group = [1,3,5,7]
167         rm.register_condition(ResourceAction.START, group,
168                 ResourceState.STARTED)
169
170         group = [10,8]
171         rm.register_condition(ResourceAction.START,
172                 group, ResourceState.STARTED, time = "10s")
173
174         waiting_for = []
175         conditions = rm.conditions.get(ResourceAction.START)
176         for (group, state, time) in conditions:
177             waiting_for.extend(group)
178
179         self.assertEquals(waiting_for, [1, 3, 5, 7, 10, 8])
180
181         group = [1, 2, 3, 4, 6]
182         rm.unregister_condition(group)
183
184         waiting_for = []
185         conditions = rm.conditions.get(ResourceAction.START)
186         for (group, state, time) in conditions:
187             waiting_for.extend(group)
188
189         self.assertEquals(waiting_for, [5, 7, 10, 8])
190
191     def test_deploy_in_order(self):
192         """
193         Test scenario: 2 applications running one on 1 node each. 
194         Nodes are connected to Interfaces which are connected
195         through a channel between them.
196
197          - Application needs to wait until Node is ready to be ready
198          - Node needs to wait until Interface is ready to be ready
199          - Interface needs to wait until Node is provisioned to be ready
200          - Interface needs to wait until Channel is ready to be ready
201          - The channel doesn't wait for any other resource to be ready
202
203         """
204         from nepi.execution.resource import ResourceFactory
205         
206         ResourceFactory.register_type(Application)
207         ResourceFactory.register_type(Node)
208         ResourceFactory.register_type(Interface)
209         ResourceFactory.register_type(Channel)
210
211         ec = ExperimentController()
212
213         app1 = ec.register_resource("Application")
214         app2 = ec.register_resource("Application")
215         node1 = ec.register_resource("Node")
216         node2 = ec.register_resource("Node")
217         iface1 = ec.register_resource("Interface")
218         iface2 = ec.register_resource("Interface")
219         chan = ec.register_resource("Channel")
220
221         ec.register_connection(app1, node1)
222         ec.register_connection(app2, node2)
223         ec.register_connection(iface1, node1)
224         ec.register_connection(iface2, node2)
225         ec.register_connection(iface1, chan)
226         ec.register_connection(iface2, chan)
227
228         ec.deploy()
229
230         guids = [app1, app2]
231         ec.wait_finished(guids)
232
233         ec.shutdown()
234
235         rmapp1 = ec.get_resource(app1)
236         rmapp2 = ec.get_resource(app2)
237         rmnode1 = ec.get_resource(node1)
238         rmnode2 = ec.get_resource(node2)
239         rmiface1 = ec.get_resource(iface1)
240         rmiface2 = ec.get_resource(iface2)
241         rmchan = ec.get_resource(chan)
242
243         ## Validate deploy order
244         # - Application needs to wait until Node is ready to be ready
245         self.assertTrue(rmnode1.ready_time < rmapp1.ready_time)
246         self.assertTrue(rmnode2.ready_time < rmapp2.ready_time)
247
248         # - Node needs to wait until Interface is ready to be ready
249         self.assertTrue(rmnode1.ready_time > rmiface1.ready_time)
250         self.assertTrue(rmnode2.ready_time > rmiface2.ready_time)
251
252         # - Interface needs to wait until Node is provisioned to be ready
253         self.assertTrue(rmnode1.provision_time < rmiface1.ready_time)
254         self.assertTrue(rmnode2.provision_time < rmiface2.ready_time)
255
256         # - Interface needs to wait until Channel is ready to be ready
257         self.assertTrue(rmchan.ready_time < rmiface1.ready_time)
258         self.assertTrue(rmchan.ready_time < rmiface2.ready_time)
259
260     def test_concurrency(self):
261         from nepi.execution.resource import ResourceFactory
262         
263         ResourceFactory.register_type(Application)
264         ResourceFactory.register_type(Node)
265         ResourceFactory.register_type(Interface)
266         ResourceFactory.register_type(Channel)
267
268         ec = ExperimentController()
269
270         node = ec.register_resource("Node")
271
272         apps = list()
273         for i in xrange(1000):
274             app = ec.register_resource("Application")
275             ec.register_connection(app, node)
276             apps.append(app)
277
278         ec.deploy()
279
280         ec.wait_finished(apps)
281         
282         self.assertTrue(ec.state(node) == ResourceState.STARTED)
283         self.assertTrue(
284                all([ec.state(guid) == ResourceState.STOPPED \
285                 for guid in apps])
286                 )
287
288         ec.shutdown()
289
290     def test_exception(self):
291         from nepi.execution.resource import ResourceFactory
292         
293         ResourceFactory.register_type(ErrorApplication)
294         ResourceFactory.register_type(Node)
295         ResourceFactory.register_type(Interface)
296
297         ec = ExperimentController()
298
299         node = ec.register_resource("Node")
300
301         app = ec.register_resource("ErrorApplication")
302         ec.register_connection(app, node)
303
304         ec.deploy()
305
306         ec.wait_finished(app)
307
308         ec.shutdown()
309
310         self.assertEquals(ec._fm._failure_level, FailureLevel.RM_FAILURE)
311
312     def test_critical(self):
313         from nepi.execution.resource import ResourceFactory
314         
315         ResourceFactory.register_type(ErrorApplication)
316         ResourceFactory.register_type(Application)
317         ResourceFactory.register_type(Node)
318         ResourceFactory.register_type(Interface)
319
320         ec = ExperimentController()
321
322         node = ec.register_resource("Node")
323
324         apps = list()
325         
326         eapp = ec.register_resource("ErrorApplication")
327         ec.set(eapp, "critical", False)
328         ec.register_connection(eapp, node)
329         apps.append(eapp)
330
331         for i in xrange(10):
332             app = ec.register_resource("Application")
333             ec.register_connection(app, node)
334             apps.append(app)
335
336         ec.deploy()
337
338         ec.wait_finished(apps)
339         
340         state = ec.state(eapp)
341         self.assertEquals(state, ResourceState.FAILED)
342       
343         apps.remove(eapp)
344
345         for app in apps:
346             state = ec.state(app)
347             self.assertEquals(state, ResourceState.STOPPED)
348         
349         ec.shutdown()
350
351         self.assertEquals(ec._fm._failure_level, FailureLevel.OK)
352
353     def test_start_with_condition(self):
354         from nepi.execution.resource import ResourceFactory
355         
356         ResourceFactory.register_type(Application)
357         ResourceFactory.register_type(Node)
358         ResourceFactory.register_type(Interface)
359
360         ec = ExperimentController()
361
362         node = ec.register_resource("Node")
363
364         app1 = ec.register_resource("Application")
365         ec.register_connection(app1, node)
366
367         app2 = ec.register_resource("Application")
368         ec.register_connection(app2, node)
369
370         ec.register_condition(app2, ResourceAction.START, app1, 
371                 ResourceState.STARTED, time = "5s")
372
373         ec.deploy()
374
375         ec.wait_finished([app1, app2])
376         
377         rmapp1 = ec.get_resource(app1)
378         rmapp2 = ec.get_resource(app2)
379
380         self.assertTrue(rmapp2.start_time > rmapp1.start_time)
381         
382         ec.shutdown()
383
384     def test_stop_with_condition(self):
385         from nepi.execution.resource import ResourceFactory
386         
387         ResourceFactory.register_type(Application)
388         ResourceFactory.register_type(Node)
389         ResourceFactory.register_type(Interface)
390
391         ec = ExperimentController()
392
393         node = ec.register_resource("Node")
394
395         app1 = ec.register_resource("Application")
396         ec.register_connection(app1, node)
397
398         app2 = ec.register_resource("Application")
399         ec.register_connection(app2, node)
400
401         ec.register_condition(app2, ResourceAction.START, app1, 
402                 ResourceState.STOPPED)
403
404         ec.deploy()
405
406         ec.wait_finished([app1, app2])
407         
408         rmapp1 = ec.get_resource(app1)
409         rmapp2 = ec.get_resource(app2)
410
411         self.assertTrue(rmapp2.start_time > rmapp1.stop_time)
412         
413         ec.shutdown()
414
415     def ztest_set_with_condition(self):
416         # TODO!!!
417         pass
418
419
420 if __name__ == '__main__':
421     unittest.main()
422