3 # NEPI, a framework to manage network experiments
4 # Copyright (C) 2013 INRIA
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
22 from nepi.execution.attribute import Attribute
23 from nepi.execution.ec import ExperimentController, FailureLevel
24 from nepi.execution.resource import ResourceManager, ResourceState, \
25 clsinit_copy, ResourceAction
32 class MyResource(ResourceManager):
36 def _register_attributes(cls):
37 cool_attr = Attribute("my_attr", "is a really nice attribute!")
38 cls._register_attribute(cool_attr)
40 def __init__(self, ec, guid):
41 super(MyResource, self).__init__(ec, guid)
44 class AnotherResource(ResourceManager):
45 _rtype = "AnotherResource"
47 def __init__(self, ec, guid):
48 super(AnotherResource, self).__init__(ec, guid)
50 class Channel(ResourceManager):
53 def __init__(self, ec, guid):
54 super(Channel, self).__init__(ec, guid)
58 super(Channel, self).do_deploy()
59 self.logger.debug(" -------- DEPLOYED ------- ")
61 class Interface(ResourceManager):
64 def __init__(self, ec, guid):
65 super(Interface, self).__init__(ec, guid)
68 node = self.get_connected(Node.get_rtype())[0]
69 chan = self.get_connected(Channel.get_rtype())[0]
71 if node.state < ResourceState.PROVISIONED:
72 self.ec.schedule("0.5s", self.deploy)
73 elif chan.state < ResourceState.READY:
74 self.ec.schedule("0.5s", self.deploy)
77 super(Interface, self).do_deploy()
78 self.logger.debug(" -------- DEPLOYED ------- ")
80 class Node(ResourceManager):
83 def __init__(self, ec, guid):
84 super(Node, self).__init__(ec, guid)
87 if self.state == ResourceState.NEW:
90 self.logger.debug(" -------- PROVISIONED ------- ")
91 self.ec.schedule("1s", self.deploy)
92 elif self.state == ResourceState.PROVISIONED:
93 ifaces = self.get_connected(Interface.get_rtype())
95 if rm.state < ResourceState.READY:
96 self.ec.schedule("0.5s", self.deploy)
99 super(Node, self).do_deploy()
100 self.logger.debug(" -------- DEPLOYED ------- ")
102 class Application(ResourceManager):
103 _rtype = "Application"
105 def __init__(self, ec, guid):
106 super(Application, self).__init__(ec, guid)
109 node = self.get_connected(Node.get_rtype())[0]
110 if node.state < ResourceState.READY:
111 self.ec.schedule("0.5s", self.deploy)
113 time.sleep(random.random() * 2)
114 super(Application, self).do_deploy()
115 self.logger.debug(" -------- DEPLOYED ------- ")
118 super(Application, self).do_start()
119 time.sleep(random.random() * 3)
120 self.ec.schedule("0.5s", self.stop)
122 class ErrorApplication(ResourceManager):
123 _rtype = "ErrorApplication"
125 def __init__(self, ec, guid):
126 super(ErrorApplication, self).__init__(ec, guid)
129 node = self.get_connected(Node.get_rtype())[0]
130 if node.state < ResourceState.READY:
131 self.ec.schedule("0.5s", self.deploy)
133 time.sleep(random.random() * 2)
134 raise RuntimeError, "NOT A REAL ERROR. JUST TESTING"
136 class ResourceFactoryTestCase(unittest.TestCase):
137 def test_add_resource_factory(self):
138 from nepi.execution.resource import ResourceFactory
140 ResourceFactory._resource_types = dict()
141 ResourceFactory.register_type(MyResource)
142 ResourceFactory.register_type(AnotherResource)
144 self.assertEquals(MyResource.get_rtype(), "MyResource")
145 self.assertEquals(len(MyResource._attributes), 2)
147 self.assertEquals(ResourceManager.get_rtype(), "Resource")
148 self.assertEquals(len(ResourceManager._attributes), 1)
150 self.assertEquals(AnotherResource.get_rtype(), "AnotherResource")
151 self.assertEquals(len(AnotherResource._attributes), 1)
153 self.assertEquals(len(ResourceFactory.resource_types()), 2)
155 # restore factory state for other tests
156 from nepi.execution.resource import populate_factory
157 ResourceFactory._resource_types = dict()
160 class ResourceManagerTestCase(unittest.TestCase):
161 def test_register_condition(self):
162 ec = ExperimentController()
163 rm = ResourceManager(ec, 15)
166 rm.register_condition(ResourceAction.START, group,
167 ResourceState.STARTED)
170 rm.register_condition(ResourceAction.START,
171 group, ResourceState.STARTED, time = "10s")
174 conditions = rm.conditions.get(ResourceAction.START)
175 for (group, state, time) in conditions:
176 waiting_for.extend(group)
178 self.assertEquals(waiting_for, [1, 3, 5, 7, 10, 8])
180 group = [1, 2, 3, 4, 6]
181 rm.unregister_condition(group)
184 conditions = rm.conditions.get(ResourceAction.START)
185 for (group, state, time) in conditions:
186 waiting_for.extend(group)
188 self.assertEquals(waiting_for, [5, 7, 10, 8])
190 def test_deploy_in_order(self):
192 Test scenario: 2 applications running one on 1 node each.
193 Nodes are connected to Interfaces which are connected
194 through a channel between them.
196 - Application needs to wait until Node is ready to be ready
197 - Node needs to wait until Interface is ready to be ready
198 - Interface needs to wait until Node is provisioned to be ready
199 - Interface needs to wait until Channel is ready to be ready
200 - The channel doesn't wait for any other resource to be ready
203 from nepi.execution.resource import ResourceFactory
205 ResourceFactory.register_type(Application)
206 ResourceFactory.register_type(Node)
207 ResourceFactory.register_type(Interface)
208 ResourceFactory.register_type(Channel)
210 ec = ExperimentController()
212 app1 = ec.register_resource("Application")
213 app2 = ec.register_resource("Application")
214 node1 = ec.register_resource("Node")
215 node2 = ec.register_resource("Node")
216 iface1 = ec.register_resource("Interface")
217 iface2 = ec.register_resource("Interface")
218 chan = ec.register_resource("Channel")
220 ec.register_connection(app1, node1)
221 ec.register_connection(app2, node2)
222 ec.register_connection(iface1, node1)
223 ec.register_connection(iface2, node2)
224 ec.register_connection(iface1, chan)
225 ec.register_connection(iface2, chan)
230 ec.wait_finished(guids)
234 rmapp1 = ec.get_resource(app1)
235 rmapp2 = ec.get_resource(app2)
236 rmnode1 = ec.get_resource(node1)
237 rmnode2 = ec.get_resource(node2)
238 rmiface1 = ec.get_resource(iface1)
239 rmiface2 = ec.get_resource(iface2)
240 rmchan = ec.get_resource(chan)
242 ## Validate deploy order
243 # - Application needs to wait until Node is ready to be ready
244 self.assertTrue(rmnode1.ready_time < rmapp1.ready_time)
245 self.assertTrue(rmnode2.ready_time < rmapp2.ready_time)
247 # - Node needs to wait until Interface is ready to be ready
248 self.assertTrue(rmnode1.ready_time > rmiface1.ready_time)
249 self.assertTrue(rmnode2.ready_time > rmiface2.ready_time)
251 # - Interface needs to wait until Node is provisioned to be ready
252 self.assertTrue(rmnode1.provision_time < rmiface1.ready_time)
253 self.assertTrue(rmnode2.provision_time < rmiface2.ready_time)
255 # - Interface needs to wait until Channel is ready to be ready
256 self.assertTrue(rmchan.ready_time < rmiface1.ready_time)
257 self.assertTrue(rmchan.ready_time < rmiface2.ready_time)
259 def test_concurrency(self):
260 from nepi.execution.resource import ResourceFactory
262 ResourceFactory.register_type(Application)
263 ResourceFactory.register_type(Node)
264 ResourceFactory.register_type(Interface)
265 ResourceFactory.register_type(Channel)
267 ec = ExperimentController()
269 node = ec.register_resource("Node")
272 for i in xrange(1000):
273 app = ec.register_resource("Application")
274 ec.register_connection(app, node)
279 ec.wait_finished(apps)
281 self.assertTrue(ec.state(node) == ResourceState.STARTED)
283 all([ec.state(guid) == ResourceState.STOPPED \
289 def test_exception(self):
290 from nepi.execution.resource import ResourceFactory
292 ResourceFactory.register_type(ErrorApplication)
293 ResourceFactory.register_type(Node)
294 ResourceFactory.register_type(Interface)
296 ec = ExperimentController()
298 node = ec.register_resource("Node")
300 app = ec.register_resource("ErrorApplication")
301 ec.register_connection(app, node)
305 ec.wait_finished(app)
309 self.assertEquals(ec._fm._failure_level, FailureLevel.RM_FAILURE)
311 def test_critical(self):
312 from nepi.execution.resource import ResourceFactory
314 ResourceFactory.register_type(ErrorApplication)
315 ResourceFactory.register_type(Application)
316 ResourceFactory.register_type(Node)
317 ResourceFactory.register_type(Interface)
319 ec = ExperimentController()
321 node = ec.register_resource("Node")
325 eapp = ec.register_resource("ErrorApplication")
326 ec.set(eapp, "critical", False)
327 ec.register_connection(eapp, node)
331 app = ec.register_resource("Application")
332 ec.register_connection(app, node)
337 ec.wait_finished(apps)
339 state = ec.state(eapp)
340 self.assertEquals(state, ResourceState.FAILED)
345 state = ec.state(app)
346 self.assertEquals(state, ResourceState.STOPPED)
350 self.assertEquals(ec._fm._failure_level, FailureLevel.OK)
352 def test_start_with_condition(self):
353 from nepi.execution.resource import ResourceFactory
355 ResourceFactory.register_type(Application)
356 ResourceFactory.register_type(Node)
357 ResourceFactory.register_type(Interface)
359 ec = ExperimentController()
361 node = ec.register_resource("Node")
363 app1 = ec.register_resource("Application")
364 ec.register_connection(app1, node)
366 app2 = ec.register_resource("Application")
367 ec.register_connection(app2, node)
369 ec.register_condition(app2, ResourceAction.START, app1,
370 ResourceState.STARTED, time = "5s")
374 ec.wait_finished([app1, app2])
376 rmapp1 = ec.get_resource(app1)
377 rmapp2 = ec.get_resource(app2)
379 self.assertTrue(rmapp2.start_time > rmapp1.start_time)
383 def test_stop_with_condition(self):
384 from nepi.execution.resource import ResourceFactory
386 ResourceFactory.register_type(Application)
387 ResourceFactory.register_type(Node)
388 ResourceFactory.register_type(Interface)
390 ec = ExperimentController()
392 node = ec.register_resource("Node")
394 app1 = ec.register_resource("Application")
395 ec.register_connection(app1, node)
397 app2 = ec.register_resource("Application")
398 ec.register_connection(app2, node)
400 ec.register_condition(app2, ResourceAction.START, app1,
401 ResourceState.STOPPED)
405 ec.wait_finished([app1, app2])
407 rmapp1 = ec.get_resource(app1)
408 rmapp2 = ec.get_resource(app2)
410 self.assertTrue(rmapp2.start_time > rmapp1.stop_time)
414 def ztest_set_with_condition(self):
419 if __name__ == '__main__':