X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=test%2Fexecution%2Fresource.py;h=6a4fd8c06163e91be4329b4cf49237028812a1af;hb=1d2350d56f314a6e3de43517a66f7e2f48128d44;hp=202a148e8d4c3656304479ec32d527e86d3b22c0;hpb=f1cb97fc50045f85a86bf6e16e0bd58b7a891a9d;p=nepi.git diff --git a/test/execution/resource.py b/test/execution/resource.py index 202a148e..6a4fd8c0 100755 --- a/test/execution/resource.py +++ b/test/execution/resource.py @@ -1,10 +1,33 @@ #!/usr/bin/env python -from neco.execution.resource import ResourceManager, ResourceFactory, clsinit -from neco.execution.attribute import Attribute +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.execution.attribute import Attribute +from nepi.execution.ec import ExperimentController, FailureLevel +from nepi.execution.resource import ResourceManager, ResourceState, \ + clsinit_copy, ResourceAction + +import random +import time import unittest -@clsinit +@clsinit_copy class MyResource(ResourceManager): _rtype = "MyResource" @@ -16,35 +39,382 @@ class MyResource(ResourceManager): def __init__(self, ec, guid): super(MyResource, self).__init__(ec, guid) -@clsinit +@clsinit_copy class AnotherResource(ResourceManager): _rtype = "AnotherResource" def __init__(self, ec, guid): super(AnotherResource, self).__init__(ec, guid) - -class EC(object): - pass +class Channel(ResourceManager): + _rtype = "Channel" + + def __init__(self, ec, guid): + super(Channel, self).__init__(ec, guid) + + def do_deploy(self): + time.sleep(1) + super(Channel, self).do_deploy() + self.logger.debug(" -------- DEPLOYED ------- ") + +class Interface(ResourceManager): + _rtype = "Interface" + + def __init__(self, ec, guid): + super(Interface, self).__init__(ec, guid) + + def do_deploy(self): + node = self.get_connected(Node.get_rtype())[0] + chan = self.get_connected(Channel.get_rtype())[0] + + if node.state < ResourceState.PROVISIONED: + self.ec.schedule("0.5s", self.deploy) + elif chan.state < ResourceState.READY: + self.ec.schedule("0.5s", self.deploy) + else: + time.sleep(2) + super(Interface, self).do_deploy() + self.logger.debug(" -------- DEPLOYED ------- ") + +class Node(ResourceManager): + _rtype = "Node" + + def __init__(self, ec, guid): + super(Node, self).__init__(ec, guid) + + def do_deploy(self): + if self.state == ResourceState.NEW: + self.do_discover() + self.do_provision() + self.logger.debug(" -------- PROVISIONED ------- ") + self.ec.schedule("1s", self.deploy) + elif self.state == ResourceState.PROVISIONED: + ifaces = self.get_connected(Interface.get_rtype()) + for rm in ifaces: + if rm.state < ResourceState.READY: + self.ec.schedule("0.5s", self.deploy) + return + + super(Node, self).do_deploy() + self.logger.debug(" -------- DEPLOYED ------- ") + +class Application(ResourceManager): + _rtype = "Application" + + def __init__(self, ec, guid): + super(Application, self).__init__(ec, guid) + + def do_deploy(self): + node = self.get_connected(Node.get_rtype())[0] + if node.state < ResourceState.READY: + self.ec.schedule("0.5s", self.deploy) + else: + time.sleep(random.random() * 2) + super(Application, self).do_deploy() + self.logger.debug(" -------- DEPLOYED ------- ") + + def do_start(self): + super(Application, self).do_start() + time.sleep(random.random() * 3) + self.ec.schedule("0.5s", self.stop) + +class ErrorApplication(ResourceManager): + _rtype = "ErrorApplication" + + def __init__(self, ec, guid): + super(ErrorApplication, self).__init__(ec, guid) + + def do_deploy(self): + node = self.get_connected(Node.get_rtype())[0] + if node.state < ResourceState.READY: + self.ec.schedule("0.5s", self.deploy) + else: + time.sleep(random.random() * 2) + raise RuntimeError("NOT A REAL ERROR. JUST TESTING") -class ResourceTestCase(unittest.TestCase): +class ResourceFactoryTestCase(unittest.TestCase): def test_add_resource_factory(self): + from nepi.execution.resource import ResourceFactory + + ResourceFactory._resource_types = dict() ResourceFactory.register_type(MyResource) ResourceFactory.register_type(AnotherResource) - self.assertEquals(MyResource.rtype(), "MyResource") - self.assertEquals(len(MyResource._attributes), 1) + # Take into account default 'Critical' attribute + self.assertEqual(MyResource.get_rtype(), "MyResource") + self.assertEqual(len(MyResource._attributes), 3) + + self.assertEqual(ResourceManager.get_rtype(), "Resource") + self.assertEqual(len(ResourceManager._attributes), 2) + + self.assertEqual(AnotherResource.get_rtype(), "AnotherResource") + self.assertEqual(len(AnotherResource._attributes), 2) + + self.assertEqual(len(ResourceFactory.resource_types()), 2) + + # restore factory state for other tests + from nepi.execution.resource import populate_factory + ResourceFactory._resource_types = dict() + populate_factory() + +class ResourceManagerTestCase(unittest.TestCase): + def test_register_condition(self): + ec = ExperimentController() + rm = ResourceManager(ec, 15) + + group = [1,3,5,7] + rm.register_condition(ResourceAction.START, group, + ResourceState.STARTED) + + group = [10,8] + rm.register_condition(ResourceAction.START, + group, ResourceState.STARTED, time = "10s") + + waiting_for = [] + conditions = rm.conditions.get(ResourceAction.START) + for (group, state, time) in conditions: + waiting_for.extend(group) + + self.assertEqual(waiting_for, [1, 3, 5, 7, 10, 8]) + + group = [1, 2, 3, 4, 6] + rm.unregister_condition(group) + + waiting_for = [] + conditions = rm.conditions.get(ResourceAction.START) + for (group, state, time) in conditions: + waiting_for.extend(group) + + self.assertEqual(waiting_for, [5, 7, 10, 8]) + + def test_deploy_in_order(self): + """ + Test scenario: 2 applications running one on 1 node each. + Nodes are connected to Interfaces which are connected + through a channel between them. + + - Application needs to wait until Node is ready to be ready + - Node needs to wait until Interface is ready to be ready + - Interface needs to wait until Node is provisioned to be ready + - Interface needs to wait until Channel is ready to be ready + - The channel doesn't wait for any other resource to be ready + + """ + from nepi.execution.resource import ResourceFactory + + ResourceFactory.register_type(Application) + ResourceFactory.register_type(Node) + ResourceFactory.register_type(Interface) + ResourceFactory.register_type(Channel) + + ec = ExperimentController() + + app1 = ec.register_resource("Application") + app2 = ec.register_resource("Application") + node1 = ec.register_resource("Node") + node2 = ec.register_resource("Node") + iface1 = ec.register_resource("Interface") + iface2 = ec.register_resource("Interface") + chan = ec.register_resource("Channel") + + ec.register_connection(app1, node1) + ec.register_connection(app2, node2) + ec.register_connection(iface1, node1) + ec.register_connection(iface2, node2) + ec.register_connection(iface1, chan) + ec.register_connection(iface2, chan) + + ec.deploy() + + guids = [app1, app2] + ec.wait_finished(guids) + + ec.shutdown() + + rmapp1 = ec.get_resource(app1) + rmapp2 = ec.get_resource(app2) + rmnode1 = ec.get_resource(node1) + rmnode2 = ec.get_resource(node2) + rmiface1 = ec.get_resource(iface1) + rmiface2 = ec.get_resource(iface2) + rmchan = ec.get_resource(chan) + + ## Validate deploy order + # - Application needs to wait until Node is ready to be ready + self.assertTrue(rmnode1.ready_time < rmapp1.ready_time) + self.assertTrue(rmnode2.ready_time < rmapp2.ready_time) + + # - Node needs to wait until Interface is ready to be ready + self.assertTrue(rmnode1.ready_time > rmiface1.ready_time) + self.assertTrue(rmnode2.ready_time > rmiface2.ready_time) + + # - Interface needs to wait until Node is provisioned to be ready + self.assertTrue(rmnode1.provision_time < rmiface1.ready_time) + self.assertTrue(rmnode2.provision_time < rmiface2.ready_time) + + # - Interface needs to wait until Channel is ready to be ready + self.assertTrue(rmchan.ready_time < rmiface1.ready_time) + self.assertTrue(rmchan.ready_time < rmiface2.ready_time) + + def test_concurrency(self): + from nepi.execution.resource import ResourceFactory + + ResourceFactory.register_type(Application) + ResourceFactory.register_type(Node) + ResourceFactory.register_type(Interface) + ResourceFactory.register_type(Channel) + + ec = ExperimentController() + + node = ec.register_resource("Node") + + apps = list() + for i in range(1000): + app = ec.register_resource("Application") + ec.register_connection(app, node) + apps.append(app) + + ec.deploy() + + ec.wait_finished(apps) + + self.assertTrue(ec.state(node) == ResourceState.STARTED) + self.assertTrue( + all([ec.state(guid) == ResourceState.STOPPED \ + for guid in apps]) + ) + + ec.shutdown() + + def test_exception(self): + from nepi.execution.resource import ResourceFactory + + ResourceFactory.register_type(ErrorApplication) + ResourceFactory.register_type(Node) + ResourceFactory.register_type(Interface) + + ec = ExperimentController() + + node = ec.register_resource("Node") + + app = ec.register_resource("ErrorApplication") + ec.register_connection(app, node) + + ec.deploy() + + ec.wait_finished(app) + + ec.shutdown() + + self.assertEqual(ec._fm._failure_level, FailureLevel.RM_FAILURE) + + def test_critical(self): + from nepi.execution.resource import ResourceFactory + + ResourceFactory.register_type(ErrorApplication) + ResourceFactory.register_type(Application) + ResourceFactory.register_type(Node) + ResourceFactory.register_type(Interface) + + ec = ExperimentController() + + node = ec.register_resource("Node") + + apps = list() + + eapp = ec.register_resource("ErrorApplication") + ec.set(eapp, "critical", False) + ec.register_connection(eapp, node) + apps.append(eapp) + + for i in range(10): + app = ec.register_resource("Application") + ec.register_connection(app, node) + apps.append(app) + + ec.deploy() + + ec.wait_finished(apps) + + state = ec.state(eapp) + self.assertEqual(state, ResourceState.FAILED) + + apps.remove(eapp) + + for app in apps: + state = ec.state(app) + self.assertEqual(state, ResourceState.STOPPED) + + ec.shutdown() + + self.assertEqual(ec._fm._failure_level, FailureLevel.OK) + + def test_start_with_condition(self): + from nepi.execution.resource import ResourceFactory + + ResourceFactory.register_type(Application) + ResourceFactory.register_type(Node) + ResourceFactory.register_type(Interface) + + ec = ExperimentController() + + node = ec.register_resource("Node") + + app1 = ec.register_resource("Application") + ec.register_connection(app1, node) + + app2 = ec.register_resource("Application") + ec.register_connection(app2, node) + + ec.register_condition(app2, ResourceAction.START, app1, + ResourceState.STARTED, time = "5s") + + ec.deploy() + + ec.wait_finished([app1, app2]) + + rmapp1 = ec.get_resource(app1) + rmapp2 = ec.get_resource(app2) + + self.assertTrue(rmapp2.start_time > rmapp1.start_time) + + ec.shutdown() + + def test_stop_with_condition(self): + from nepi.execution.resource import ResourceFactory + + ResourceFactory.register_type(Application) + ResourceFactory.register_type(Node) + ResourceFactory.register_type(Interface) + + ec = ExperimentController() + + node = ec.register_resource("Node") + + app1 = ec.register_resource("Application") + ec.register_connection(app1, node) + + app2 = ec.register_resource("Application") + ec.register_connection(app2, node) + + ec.register_condition(app2, ResourceAction.START, app1, + ResourceState.STOPPED) + + ec.deploy() - self.assertEquals(Resource.rtype(), "Resource") - self.assertEquals(len(Resource._attributes), 0) + ec.wait_finished([app1, app2]) + + rmapp1 = ec.get_resource(app1) + rmapp2 = ec.get_resource(app2) - self.assertEquals(AnotherResource.rtype(), "AnotherResource") - self.assertEquals(len(AnotherResource._attributes), 0) + self.assertTrue(rmapp2.start_time > rmapp1.stop_time) + + ec.shutdown() - #self.assertEquals(OmfNode.rtype(), "OmfNode") - #self.assertEquals(len(OmfNode._attributes), 0) + def ztest_set_with_condition(self): + # TODO!!! + pass - self.assertEquals(len(ResourceFactory.resource_types()), 2) if __name__ == '__main__': unittest.main()