c297aff27bd371091dff3d187fb40c43867bf0b3
[nepi.git] / test / resources / omf / set_hook.py
1 #!/usr/bin/env python
2 #
3 #    NEPI, a framework to manage network experiments
4 #    Copyright (C) 2013 INRIA
5 #
6 #    This program is free software: you can redistribute it and/or modify
7 #    it under the terms of the GNU General Public License version 2 as
8 #    published by the Free Software Foundation;
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Julien Tribino <julien.tribino@inria.fr>
19
20
21 from nepi.execution.resource import ResourceFactory, clsinit_copy, ResourceManager, ResourceAction, ResourceState
22 from nepi.execution.ec import ExperimentController
23 from nepi.execution.attribute import Attribute, Flags 
24
25 from nepi.resources.omf.node import OMFNode
26 from nepi.resources.omf.application import OMFApplication
27
28 from nepi.util.timefuncs import *
29
30 import time
31 import unittest
32
33 class DummyEC(ExperimentController):
34     pass
35
36 @clsinit_copy
37 class OMFDummyApplication(OMFApplication):
38     _rtype = "omf::DummyApplication"
39
40     @classmethod
41     def _register_attributes(cls):
42         test = Attribute("test", "Input of the application", default = 0, set_hook = cls.test_hook)
43         cls._register_attribute(test)
44
45     @classmethod
46     def test_hook(cls, old_value, new_value):
47         new_value *= 10
48         print "Change the value of test from "+ str(old_value) +" to : " + str(new_value)
49         return new_value
50
51
52 class OMFTestSet(unittest.TestCase):
53
54     def test_set_hook(self):
55         self.ec = DummyEC(exp_id = "30")
56
57         ResourceFactory.register_type(OMFDummyApplication)
58
59         self.node1 = self.ec.register_resource("omf::Node")
60         self.ec.set(self.node1, 'hostname', 'omf.plexus.wlab17')
61         self.ec.set(self.node1, 'xmppSlice', "nepi")
62         self.ec.set(self.node1, 'xmppHost', "xmpp-plexus.onelab.eu")
63         self.ec.set(self.node1, 'xmppPort', "5222")
64         self.ec.set(self.node1, 'xmppPassword', "1234")
65
66         self.app1 = self.ec.register_resource("omf::DummyApplication")
67         self.ec.set(self.app1, 'appid', 'Test#1')
68         self.ec.set(self.app1, 'path', "/usr/bin/ping")
69         self.ec.set(self.app1, 'args', "")
70         self.ec.set(self.app1, 'env', "")
71         self.ec.set(self.app1, 'xmppSlice', "nepi")
72         self.ec.set(self.app1, 'xmppHost', "xmpp-plexus.onelab.eu")
73         self.ec.set(self.app1, 'xmppPort', "5222")
74         self.ec.set(self.app1, 'xmppPassword', "1234")
75
76         self.ec.register_connection(self.app1, self.node1)
77
78         self.ec.register_condition(self.app1, ResourceAction.STOP, self.app1, ResourceState.STARTED , "10s")
79
80         self.ec.deploy()
81
82         time.sleep(3)
83         print "First try to change the STDIN"
84         self.ec.set(self.app1, 'test', 3)
85
86         self.assertEquals(self.ec.get(self.app1, 'test'), 30)
87
88         time.sleep(3)
89         print "Second try to change the STDIN"
90         self.ec.set(self.app1, 'test', 101)
91         self.assertEquals(self.ec.get(self.app1, 'test'), 1010)
92
93         self.ec.wait_finished([self.app1])
94
95         # Stop Experiment
96         self.ec.shutdown()
97
98
99 if __name__ == '__main__':
100     unittest.main()
101
102
103