use print() - import print_function - should be fine for both py2 and py3
[nepi.git] / test / resources / omf / set_hook.py
1 #!/usr/bin/env python
2 #
3 #    NEPI, a framework to manage network experiments
4 #    Copyright (C) 2013 INRIA
5 #
6 #    This program is free software: you can redistribute it and/or modify
7 #    it under the terms of the GNU General Public License version 2 as
8 #    published by the Free Software Foundation;
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Julien Tribino <julien.tribino@inria.fr>
19
20 from __future__ import print_function
21
22 from nepi.execution.resource import (ResourceFactory, clsinit_copy,
23                                      ResourceManager, ResourceAction, ResourceState)
24 from nepi.execution.ec import ExperimentController
25 from nepi.execution.attribute import Attribute, Flags 
26
27 from nepi.resources.omf.node import OMFNode
28 from nepi.resources.omf.application import OMFApplication
29
30 from nepi.util.timefuncs import *
31
32 import time
33 import unittest
34
35 class DummyEC(ExperimentController):
36     pass
37
38 @clsinit_copy
39 class OMFDummyApplication(OMFApplication):
40     _rtype = "omf::DummyApplication"
41
42     @classmethod
43     def _register_attributes(cls):
44         test = Attribute("test", "Input of the application", default = 0, set_hook = cls.test_hook)
45         cls._register_attribute(test)
46
47     @classmethod
48     def test_hook(cls, old_value, new_value):
49         new_value *= 10
50         print("Change the value of test from "+ str(old_value) +" to : " + str(new_value))
51         return new_value
52
53
54 class OMFTestSet(unittest.TestCase):
55
56     def test_set_hook(self):
57         self.ec = DummyEC(exp_id = "30")
58
59         ResourceFactory.register_type(OMFDummyApplication)
60
61         self.node1 = self.ec.register_resource("omf::Node")
62         self.ec.set(self.node1, 'hostname', 'omf.plexus.wlab17')
63         self.ec.set(self.node1, 'xmppSlice', "nepi")
64         self.ec.set(self.node1, 'xmppHost', "xmpp-plexus.onelab.eu")
65         self.ec.set(self.node1, 'xmppPort', "5222")
66         self.ec.set(self.node1, 'xmppPassword', "1234")
67
68         self.app1 = self.ec.register_resource("omf::DummyApplication")
69         self.ec.set(self.app1, 'appid', 'Test#1')
70         self.ec.set(self.app1, 'path', "/usr/bin/ping")
71         self.ec.set(self.app1, 'args', "")
72         self.ec.set(self.app1, 'env', "")
73         self.ec.set(self.app1, 'xmppSlice', "nepi")
74         self.ec.set(self.app1, 'xmppHost', "xmpp-plexus.onelab.eu")
75         self.ec.set(self.app1, 'xmppPort', "5222")
76         self.ec.set(self.app1, 'xmppPassword', "1234")
77
78         self.ec.register_connection(self.app1, self.node1)
79
80         self.ec.register_condition(self.app1, ResourceAction.STOP, self.app1, ResourceState.STARTED , "10s")
81
82         self.ec.deploy()
83
84         time.sleep(3)
85         print("First try to change the STDIN")
86         self.ec.set(self.app1, 'test', 3)
87
88         self.assertEquals(self.ec.get(self.app1, 'test'), 30)
89
90         time.sleep(3)
91         print("Second try to change the STDIN")
92         self.ec.set(self.app1, 'test', 101)
93         self.assertEquals(self.ec.get(self.app1, 'test'), 1010)
94
95         self.ec.wait_finished([self.app1])
96
97         # Stop Experiment
98         self.ec.shutdown()
99
100
101 if __name__ == '__main__':
102     unittest.main()
103
104
105