3 # NEPI, a framework to manage network experiments
4 # Copyright (C) 2013 INRIA
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
21 from nepi.execution.ec import ExperimentController
23 from nepi.resources.planetlab.sfa_node import PlanetlabSfaNode
24 from nepi.util.sfaapi import SFAAPI, SFAAPIFactory
26 from test_utils import skipIfNotSfaCredentials
31 import multiprocessing
34 class DummyEC(ExperimentController):
37 class PLSfaNodeFactoryTestCase(unittest.TestCase):
39 def test_creation_phase(self):
40 self.assertEquals(PlanetlabSfaNode._rtype, "PlanetlabSfaNode")
41 self.assertEquals(len(PlanetlabSfaNode._attributes), 29)
43 class PLSfaNodeTestCase(unittest.TestCase):
45 This tests use inria_nepi slice, from the test instance of MyPLC
46 nepiplc.pl.sophia.inria.fr. This test can fail if the user running
47 the test does not have a user in this instance of MyPLC or is not
48 added to the inria_nepi slice.
53 self.username = os.environ.get('SFA_SLICE')
54 self.sfauser = os.environ.get('SFA_USER')
55 self.sfaPrivateKey = os.environ.get('SFA_PK')
57 @skipIfNotSfaCredentials
58 def test_a_sfaapi(self):
60 Check that the api to discover and reserve resources is well
61 instanciated, and is an instance of SFAAPI. Check that using
62 the same credentials, the same object of the api is used.
64 node1 = self.ec.register_resource("PlanetlabSfaNode")
65 self.ec.set(node1, "hostname", "planetlab2.ionio.gr")
66 self.ec.set(node1, "username", self.username)
67 self.ec.set(node1, "sfauser", self.sfauser)
68 self.ec.set(node1, "sfaPrivateKey", self.sfaPrivateKey)
70 plnode_rm1 = self.ec.get_resource(node1)
72 self.assertIsNone(plnode_rm1._node_to_provision)
74 api1 = plnode_rm1.sfaapi
75 self.assertIsInstance(api1, SFAAPI)
76 self.assertEquals(len(api1.reserved()), 0)
77 self.assertEquals(len(api1.blacklisted()), 0)
79 node2 = self.ec.register_resource("PlanetlabSfaNode")
80 self.ec.set(node2, "hostname", "planetlab2.ionio.gr")
81 self.ec.set(node2, "username", self.username)
82 self.ec.set(node2, "sfauser", self.sfauser)
83 self.ec.set(node2, "sfaPrivateKey", self.sfaPrivateKey)
85 plnode_rm2 = self.ec.get_resource(node2)
86 api2 = plnode_rm2.sfaapi
87 self.assertEquals(api1, api2)
89 @skipIfNotSfaCredentials
90 def test_discover(self):
92 Check that the method do_discover reserve the right node.
94 node = self.ec.register_resource("PlanetlabSfaNode")
95 self.ec.set(node, "hostname", "planetlab2.ionio.gr")
96 self.ec.set(node, "username", self.username)
97 self.ec.set(node, "sfauser", self.sfauser)
98 self.ec.set(node, "sfaPrivateKey", self.sfaPrivateKey)
100 plnode_rm = self.ec.get_resource(node)
102 hostname = plnode_rm.get("hostname")
103 self.assertIsNotNone(hostname)
105 self.assertEquals(plnode_rm.sfaapi.reserved(), set())
107 plnode_rm.do_discover()
108 self.assertEquals(plnode_rm.sfaapi.reserved().pop(), 'ple.dbislab.planetlab2.ionio.gr')
109 self.assertEquals(plnode_rm._node_to_provision, 'ple.dbislab.planetlab2.ionio.gr')
111 @skipIfNotSfaCredentials
112 def test_provision(self):
114 This test checks that the method do_provision add the node in the slice and check
115 its well functioning.
117 node = self.ec.register_resource("PlanetlabSfaNode")
118 self.ec.set(node, "hostname", "planetlab2.ionio.gr")
119 self.ec.set(node, "username", self.username)
120 self.ec.set(node, "sfauser", self.sfauser)
121 self.ec.set(node, "sfaPrivateKey", self.sfaPrivateKey)
123 plnode_rm = self.ec.get_resource(node)
125 self.assertEquals(plnode_rm.sfaapi.reserved(), set())
126 self.assertIsNone(plnode_rm._node_to_provision)
128 slicename = 'ple.' + self.username.replace('_', '.')
130 plnode_rm.do_discover()
131 plnode_rm.do_provision()
133 cmd = 'echo "IT WORKED"'
134 ((out, err), proc) = plnode_rm.execute(cmd)
135 self.assertEquals(out.strip(), "IT WORKED")
137 urn_to_delete = 'urn:publicid:IDN+ple:dbislab+node+planetlab2.ionio.gr'
138 plnode_rm.sfaapi.remove_resource_from_slice(slicename, urn_to_delete)
140 slice_resources = plnode_rm.sfaapi.get_slice_resources(slicename)['resource']
142 slice_resources_hrn = plnode_rm.sfaapi.get_resources_hrn(slice_resources)
143 self.assertNotIn('planetlab2.ionio.gr', slice_resources_hrn.keys())
145 @skipIfNotSfaCredentials
146 def test_xdeploy(self):
148 Test with the nodes being discover and provision at the same time.
149 The deploy should fail as the test before, there aren't 4 nodes of
150 that carachteristics.
152 node = self.ec.register_resource("PlanetlabSfaNode")
153 self.ec.set(node, "hostname", "planetlab2.ionio.gr")
154 self.ec.set(node, "username", self.username)
155 self.ec.set(node, "sfauser", self.sfauser)
156 self.ec.set(node, "sfaPrivateKey", self.sfaPrivateKey)
159 self.ec.wait_deployed(node)
160 state = self.ec.state(node)
161 self.assertEquals(state, 3)
167 if __name__ == '__main__':