Making runId as sub folder optional for the Collector RM
[nepi.git] / src / nepi / resources / omf / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Julien Tribino <julien.tribino@inria.fr>
20
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay
23 from nepi.execution.attribute import Attribute, Flags 
24 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
25 from nepi.resources.omf.node import OMFNode
26 from nepi.resources.omf.omf_api import OMFAPIFactory
27
28 from nepi.util import sshfuncs
29
30 @clsinit_copy
31 class OMFApplication(OMFResource):
32     """
33     .. class:: Class Args :
34       
35         :param ec: The Experiment controller
36         :type ec: ExperimentController
37         :param guid: guid of the RM
38         :type guid: int
39
40     """
41     _rtype = "OMFApplication"
42     _authorized_connections = ["OMFNode"]
43
44     @classmethod
45     def _register_attributes(cls):
46         """ Register the attributes of an OMF application
47
48         """
49         appid = Attribute("appid", "Name of the application")
50         path = Attribute("path", "Path of the application")
51         args = Attribute("args", "Argument of the application")
52         env = Attribute("env", "Environnement variable of the application")
53         stdin = Attribute("stdin", "Input of the application", default = "")
54         sources = Attribute("sources", "Sources of the application", 
55                      flags = Flags.Design)
56         sshuser = Attribute("sshUser", "user to connect with ssh", 
57                      flags = Flags.Design)
58         sshkey = Attribute("sshKey", "key to use for ssh", 
59                      flags = Flags.Design)
60         cls._register_attribute(appid)
61         cls._register_attribute(path)
62         cls._register_attribute(args)
63         cls._register_attribute(env)
64         cls._register_attribute(stdin)
65         cls._register_attribute(sources)
66         cls._register_attribute(sshuser)
67         cls._register_attribute(sshkey)
68
69     def __init__(self, ec, guid):
70         """
71         :param ec: The Experiment controller
72         :type ec: ExperimentController
73         :param guid: guid of the RM
74         :type guid: int
75         :param creds: Credentials to communicate with the rm (XmppClient for OMF)
76         :type creds: dict
77
78         """
79         super(OMFApplication, self).__init__(ec, guid)
80
81         self.set('appid', "")
82         self.set('path', "")
83         self.set('args', "")
84         self.set('env', "")
85
86         self._node = None
87
88         self._omf_api = None
89
90         self.add_set_hook()
91
92     @property
93     def exp_id(self):
94         return self.ec.exp_id
95
96     @property
97     def node(self):
98         rm_list = self.get_connected(OMFNode.get_rtype())
99         if rm_list: return rm_list[0]
100         return None
101
102     def stdin_hook(self, old_value, new_value):
103         """ Set a hook to the stdin attribute in order to send a message at each time
104         the value of this parameter is changed
105
106         """
107         self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
108         return new_value
109
110     def add_set_hook(self):
111         """ Initialize the hooks
112
113         """
114         attr = self._attrs["stdin"]
115         attr.set_hook = self.stdin_hook
116
117     def valid_connection(self, guid):
118         """ Check if the connection with the guid in parameter is possible. 
119         Only meaningful connections are allowed.
120
121         :param guid: Guid of RM it will be connected
122         :type guid: int
123         :rtype:  Boolean
124
125         """
126         rm = self.ec.get_resource(guid)
127         if rm.get_rtype() not in self._authorized_connections:
128             msg = ("Connection between %s %s and %s %s refused: "
129                     "An Application can be connected only to a Node" ) % \
130                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
131             self.debug(msg)
132
133             return False
134
135         elif len(self.connections) != 0 :
136             msg = ("Connection between %s %s and %s %s refused: "
137                     "This Application is already connected" ) % \
138                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
139             self.debug(msg)
140
141             return False
142
143         else :
144             msg = "Connection between %s %s and %s %s accepted" % (
145                     self.get_rtype(), self._guid, rm.get_rtype(), guid)
146             self.debug(msg)
147
148             return True
149
150     def do_deploy(self):
151         """ Deploy the RM. It means nothing special for an application 
152         for now (later it will be upload sources, ...)
153         It becomes DEPLOYED after getting the xmpp client.
154
155         """
156
157         self.set('xmppSlice',self.node.get('xmppSlice'))
158         self.set('xmppHost',self.node.get('xmppHost'))
159         self.set('xmppPort',self.node.get('xmppPort'))
160         self.set('xmppPassword',self.node.get('xmppPassword'))
161
162         if not (self.get('xmppSlice') and self.get('xmppHost')
163               and self.get('xmppPort') and self.get('xmppPassword')):
164             msg = "Credentials are not initialzed. XMPP Connections impossible"
165             self.error(msg)
166             raise RuntimeError, msg
167
168         if not self._omf_api :
169             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
170                 self.get('xmppHost'), self.get('xmppPort'), 
171                 self.get('xmppPassword'), exp_id = self.exp_id)
172
173         if self.get('sources'):
174             gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')]
175             user = self.get('sshUser') or self.get('xmppSlice')
176             dst = user + "@"+ gateway + ":"
177             (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
178
179         super(OMFApplication, self).do_deploy()
180
181     def do_start(self):
182         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
183          to execute the application. 
184          It becomes STARTED before the messages are sent (for coordination)
185
186         """
187         if not (self.get('appid') and self.get('path')) :
188             msg = "Application's information are not initialized"
189             self.error(msg)
190             raise RuntimeError, msg
191
192         if not self.get('args'):
193             self.set('args', " ")
194         if not self.get('env'):
195             self.set('env', " ")
196
197         # Some information to check the information in parameter
198         msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
199             self.get('appid') + " : " + self.get('path') + " : " + \
200             self.get('args') + " : " + self.get('env')
201         self.info(msg)
202
203         self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
204             self.get('args'), self.get('path'), self.get('env'))
205
206         super(OMFApplication, self).do_start()
207
208     def do_stop(self):
209         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
210         kill the application. 
211         State is set to STOPPED after the message is sent.
212
213         """
214
215         self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
216         super(OMFApplication, self).do_stop()
217
218     def do_release(self):
219         """ Clean the RM at the end of the experiment and release the API.
220
221         """
222         if self._omf_api:
223             OMFAPIFactory.release_api(self.get('xmppSlice'), 
224                 self.get('xmppHost'), self.get('xmppPort'), 
225                 self.get('xmppPassword'), exp_id = self.exp_id)
226
227         super(OMFApplication, self).do_release()
228