3 # NEPI, a framework to manage network experiments
4 # Copyright (C) 2013 INRIA
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
22 from nepi.execution.attribute import Attribute
23 from nepi.execution.ec import ExperimentController, FailureLevel
24 from nepi.execution.resource import ResourceManager, ResourceState, \
25 clsinit_copy, ResourceAction, failtrap
32 class MyResource(ResourceManager):
36 def _register_attributes(cls):
37 cool_attr = Attribute("my_attr", "is a really nice attribute!")
38 cls._register_attribute(cool_attr)
40 def __init__(self, ec, guid):
41 super(MyResource, self).__init__(ec, guid)
44 class AnotherResource(ResourceManager):
45 _rtype = "AnotherResource"
47 def __init__(self, ec, guid):
48 super(AnotherResource, self).__init__(ec, guid)
50 class Channel(ResourceManager):
53 def __init__(self, ec, guid):
54 super(Channel, self).__init__(ec, guid)
58 super(Channel, self).deploy()
59 self.logger.debug(" -------- DEPLOYED ------- ")
61 class Interface(ResourceManager):
64 def __init__(self, ec, guid):
65 super(Interface, self).__init__(ec, guid)
68 node = self.get_connected(Node.rtype())[0]
69 chan = self.get_connected(Channel.rtype())[0]
71 if node.state < ResourceState.PROVISIONED:
72 self.ec.schedule("0.5s", self.deploy)
73 elif chan.state < ResourceState.READY:
74 self.ec.schedule("0.5s", self.deploy)
77 super(Interface, self).deploy()
78 self.logger.debug(" -------- DEPLOYED ------- ")
80 class Node(ResourceManager):
83 def __init__(self, ec, guid):
84 super(Node, self).__init__(ec, guid)
87 if self.state == ResourceState.NEW:
90 self.logger.debug(" -------- PROVISIONED ------- ")
91 self.ec.schedule("1s", self.deploy)
92 elif self.state == ResourceState.PROVISIONED:
93 ifaces = self.get_connected(Interface.rtype())
95 if rm.state < ResourceState.READY:
96 self.ec.schedule("0.5s", self.deploy)
99 super(Node, self).deploy()
100 self.logger.debug(" -------- DEPLOYED ------- ")
102 class Application(ResourceManager):
103 _rtype = "Application"
105 def __init__(self, ec, guid):
106 super(Application, self).__init__(ec, guid)
109 node = self.get_connected(Node.rtype())[0]
110 if node.state < ResourceState.READY:
111 self.ec.schedule("0.5s", self.deploy)
113 time.sleep(random.random() * 2)
114 super(Application, self).deploy()
115 self.logger.debug(" -------- DEPLOYED ------- ")
118 super(Application, self).start()
119 time.sleep(random.random() * 3)
120 self._state = ResourceState.FINISHED
122 class ErrorApplication(ResourceManager):
123 _rtype = "ErrorApplication"
125 def __init__(self, ec, guid):
126 super(ErrorApplication, self).__init__(ec, guid)
130 node = self.get_connected(Node.rtype())[0]
131 if node.state < ResourceState.READY:
132 self.ec.schedule("0.5s", self.deploy)
134 time.sleep(random.random() * 2)
135 raise RuntimeError, "NOT A REAL ERROR. JUST TESTING"
137 class ResourceFactoryTestCase(unittest.TestCase):
138 def test_add_resource_factory(self):
139 from nepi.execution.resource import ResourceFactory
141 ResourceFactory._resource_types = dict()
142 ResourceFactory.register_type(MyResource)
143 ResourceFactory.register_type(AnotherResource)
145 self.assertEquals(MyResource.rtype(), "MyResource")
146 self.assertEquals(len(MyResource._attributes), 2)
148 self.assertEquals(ResourceManager.rtype(), "Resource")
149 self.assertEquals(len(ResourceManager._attributes), 1)
151 self.assertEquals(AnotherResource.rtype(), "AnotherResource")
152 self.assertEquals(len(AnotherResource._attributes), 1)
154 self.assertEquals(len(ResourceFactory.resource_types()), 2)
156 # restore factory state for other tests
157 from nepi.execution.resource import populate_factory
158 ResourceFactory._resource_types = dict()
161 class ResourceManagerTestCase(unittest.TestCase):
162 def test_register_condition(self):
163 ec = ExperimentController()
164 rm = ResourceManager(ec, 15)
167 rm.register_condition(ResourceAction.START, group,
168 ResourceState.STARTED)
171 rm.register_condition(ResourceAction.START,
172 group, ResourceState.STARTED, time = "10s")
175 conditions = rm.conditions.get(ResourceAction.START)
176 for (group, state, time) in conditions:
177 waiting_for.extend(group)
179 self.assertEquals(waiting_for, [1, 3, 5, 7, 10, 8])
181 group = [1, 2, 3, 4, 6]
182 rm.unregister_condition(group)
185 conditions = rm.conditions.get(ResourceAction.START)
186 for (group, state, time) in conditions:
187 waiting_for.extend(group)
189 self.assertEquals(waiting_for, [5, 7, 10, 8])
191 def test_deploy_in_order(self):
193 Test scenario: 2 applications running one on 1 node each.
194 Nodes are connected to Interfaces which are connected
195 through a channel between them.
197 - Application needs to wait until Node is ready to be ready
198 - Node needs to wait until Interface is ready to be ready
199 - Interface needs to wait until Node is provisioned to be ready
200 - Interface needs to wait until Channel is ready to be ready
201 - The channel doesn't wait for any other resource to be ready
204 from nepi.execution.resource import ResourceFactory
206 ResourceFactory.register_type(Application)
207 ResourceFactory.register_type(Node)
208 ResourceFactory.register_type(Interface)
209 ResourceFactory.register_type(Channel)
211 ec = ExperimentController()
213 app1 = ec.register_resource("Application")
214 app2 = ec.register_resource("Application")
215 node1 = ec.register_resource("Node")
216 node2 = ec.register_resource("Node")
217 iface1 = ec.register_resource("Interface")
218 iface2 = ec.register_resource("Interface")
219 chan = ec.register_resource("Channel")
221 ec.register_connection(app1, node1)
222 ec.register_connection(app2, node2)
223 ec.register_connection(iface1, node1)
224 ec.register_connection(iface2, node2)
225 ec.register_connection(iface1, chan)
226 ec.register_connection(iface2, chan)
231 ec.wait_finished(guids)
235 rmapp1 = ec.get_resource(app1)
236 rmapp2 = ec.get_resource(app2)
237 rmnode1 = ec.get_resource(node1)
238 rmnode2 = ec.get_resource(node2)
239 rmiface1 = ec.get_resource(iface1)
240 rmiface2 = ec.get_resource(iface2)
241 rmchan = ec.get_resource(chan)
243 ## Validate deploy order
244 # - Application needs to wait until Node is ready to be ready
245 self.assertTrue(rmnode1.ready_time < rmapp1.ready_time)
246 self.assertTrue(rmnode2.ready_time < rmapp2.ready_time)
248 # - Node needs to wait until Interface is ready to be ready
249 self.assertTrue(rmnode1.ready_time > rmiface1.ready_time)
250 self.assertTrue(rmnode2.ready_time > rmiface2.ready_time)
252 # - Interface needs to wait until Node is provisioned to be ready
253 self.assertTrue(rmnode1.provision_time < rmiface1.ready_time)
254 self.assertTrue(rmnode2.provision_time < rmiface2.ready_time)
256 # - Interface needs to wait until Channel is ready to be ready
257 self.assertTrue(rmchan.ready_time < rmiface1.ready_time)
258 self.assertTrue(rmchan.ready_time < rmiface2.ready_time)
260 def test_concurrency(self):
261 from nepi.execution.resource import ResourceFactory
263 ResourceFactory.register_type(Application)
264 ResourceFactory.register_type(Node)
265 ResourceFactory.register_type(Interface)
266 ResourceFactory.register_type(Channel)
268 ec = ExperimentController()
270 node = ec.register_resource("Node")
273 for i in xrange(1000):
274 app = ec.register_resource("Application")
275 ec.register_connection(app, node)
280 ec.wait_finished(apps)
282 self.assertTrue(ec.state(node) == ResourceState.STARTED)
284 all([ec.state(guid) == ResourceState.FINISHED \
290 def test_exception(self):
291 from nepi.execution.resource import ResourceFactory
293 ResourceFactory.register_type(ErrorApplication)
294 ResourceFactory.register_type(Node)
295 ResourceFactory.register_type(Interface)
296 ResourceFactory.register_type(Channel)
298 ec = ExperimentController()
300 node = ec.register_resource("Node")
304 app = ec.register_resource("ErrorApplication")
305 ec.register_connection(app, node)
311 ec.wait_finished(apps)
315 self.assertTrue(ec._fm._failure_level == FailureLevel.RM_FAILURE)
318 def ztest_start_with_condition(self):
322 def ztest_stop_with_condition(self):
326 def ztest_set_with_condition(self):
331 if __name__ == '__main__':