Changes for release
[nepi.git] / src / nepi / resources / omf / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Julien Tribino <julien.tribino@inria.fr>
20
21 import os
22
23 from nepi.execution.resource import ResourceManager, clsinit_copy, \
24         ResourceState, reschedule_delay
25 from nepi.execution.attribute import Attribute, Flags 
26 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
27 from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
28 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
29
30 from nepi.util import sshfuncs
31
32 @clsinit_copy
33 class OMFApplication(OMFResource):
34     """
35     .. class:: Class Args :
36       
37         :param ec: The Experiment controller
38         :type ec: ExperimentController
39         :param guid: guid of the RM
40         :type guid: int
41
42     """
43     _rtype = "OMFApplication"
44     _authorized_connections = ["OMFNode", "WilabtSfaNode"]
45
46     @classmethod
47     def _register_attributes(cls):
48         """ Register the attributes of an OMF application
49
50         """
51         command = Attribute("command", "Command to execute")
52         env = Attribute("env", "Environnement variable of the application")
53
54         # For OMF 5:
55         appid = Attribute("appid", "Name of the application")
56         stdin = Attribute("stdin", "Input of the application", default = "")
57         sources = Attribute("sources", "Sources of the application", 
58                      flags = Flags.Design)
59         sshuser = Attribute("sshUser", "user to connect with ssh", 
60                      flags = Flags.Design)
61         sshkey = Attribute("sshKey", "key to use for ssh", 
62                      flags = Flags.Design)
63
64         cls._register_attribute(appid)
65         cls._register_attribute(command)
66         cls._register_attribute(env)
67         cls._register_attribute(stdin)
68         cls._register_attribute(sources)
69         cls._register_attribute(sshuser)
70         cls._register_attribute(sshkey)
71
72     def __init__(self, ec, guid):
73         """
74         :param ec: The Experiment controller
75         :type ec: ExperimentController
76         :param guid: guid of the RM
77         :type guid: int
78         :param creds: Credentials to communicate with the rm (XmppClient for OMF)
79         :type creds: dict
80
81         """
82         super(OMFApplication, self).__init__(ec, guid)
83
84         self.set('command', "")
85         self.set('appid', "")
86         self._path= ""
87         self._args = ""
88         self.set('env', "")
89
90         self._node = None
91
92         self._omf_api = None
93         self._topic_app = None
94         self.create_id = None
95         self._create_cnt = 0
96         self._start_cnt = 0
97         self.release_id = None
98         self._release_cnt = 0
99
100         self.add_set_hook()
101
102     def _init_command(self):
103         comm = self.get('command').split(' ')
104         self._path= comm[0]
105         if len(comm)>1:
106             self._args = ' '.join(comm[1:])
107
108     @property
109     def exp_id(self):
110         return self.ec.exp_id
111
112     @property
113     def node(self):
114         rm_list = self.get_connected(OMFNode.get_rtype())
115         if rm_list: return rm_list[0]
116         return None
117
118     def stdin_hook(self, old_value, new_value):
119         """ Set a hook to the stdin attribute in order to send a message at each time
120         the value of this parameter is changed. Used ofr OMF 5.4 only
121
122         """
123         self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
124         return new_value
125
126     def add_set_hook(self):
127         """ Initialize the hooks for OMF 5.4 only
128
129         """
130         attr = self._attrs["stdin"]
131         attr.set_hook = self.stdin_hook
132
133     def valid_connection(self, guid):
134         """ Check if the connection with the guid in parameter is possible. 
135         Only meaningful connections are allowed.
136
137         :param guid: Guid of RM it will be connected
138         :type guid: int
139         :rtype:  Boolean
140
141         """
142         rm = self.ec.get_resource(guid)
143         if rm.get_rtype() not in self._authorized_connections:
144             msg = ("Connection between %s %s and %s %s refused: "
145                     "An Application can be connected only to a Node" ) % \
146                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
147             self.debug(msg)
148
149             return False
150
151         elif len(self.connections) != 0 :
152             msg = ("Connection between %s %s and %s %s refused: "
153                     "This Application is already connected" ) % \
154                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
155             self.debug(msg)
156
157             return False
158
159         else :
160             msg = "Connection between %s %s and %s %s accepted" % (
161                     self.get_rtype(), self._guid, rm.get_rtype(), guid)
162             self.debug(msg)
163
164             return True
165
166     def do_deploy(self):
167         """ Deploy the RM. It means nothing special for an application 
168         for now (later it will be upload sources, ...)
169         It becomes DEPLOYED after the topic for the application has been created
170
171         """
172         if not self.node or self.node.state < ResourceState.READY:
173             self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
174                        % self.node.state )
175             self.ec.schedule(reschedule_delay, self.deploy)
176             return
177
178         self._init_command()
179
180         self.set('xmppUser',self.node.get('xmppUser'))
181         self.set('xmppServer',self.node.get('xmppServer'))
182         self.set('xmppPort',self.node.get('xmppPort'))
183         self.set('xmppPassword',self.node.get('xmppPassword'))
184         self.set('version',self.node.get('version'))
185
186         if not self.get('xmppServer'):
187             msg = "XmppServer is not initialzed. XMPP Connections impossible"
188             self.error(msg)
189             raise RuntimeError, msg
190
191         if not (self.get('xmppUser') or self.get('xmppPort') 
192                    or self.get('xmppPassword')):
193             msg = "Credentials are not all initialzed. Default values will be used"
194             self.warn(msg)
195
196         if not self.get('command') :
197             msg = "Application's Command is not initialized"
198             self.error(msg)
199             raise RuntimeError, msg
200
201         if not self._omf_api :
202             self._omf_api = OMFAPIFactory.get_api(self.get('version'), 
203               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
204                self.get('xmppPassword'), exp_id = self.exp_id)
205
206         if self.get('version') == "5":
207             if self.get('sources'):
208                 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
209                 user = self.get('sshUser') or self.get('xmppUser')
210                 dst = user + "@"+ gateway + ":"
211                 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
212         else :
213             # For OMF 6 :
214             if not self.create_id:
215                 props = {}
216                 if self.get('command'):
217                     props['application:binary_path'] = self.get('command')
218                     props['application:hrn'] = self.get('command')
219                     props['application:membership'] = self._topic_app
220                 props['application:type'] = "application"
221     
222                 self.create_id = os.urandom(16).encode('hex')
223                 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
224    
225             if self._create_cnt > confirmation_counter:
226                 msg = "Couldn't retrieve the confirmation of the creation"
227                 self.error(msg)
228                 raise RuntimeError, msg
229
230             uid = self.check_deploy(self.create_id)
231             if not uid:
232                 self._create_cnt +=1
233                 self.ec.schedule(reschedule_check, self.deploy)
234                 return
235
236             self._topic_app = uid
237             self._omf_api.enroll_topic(self._topic_app)
238
239         super(OMFApplication, self).do_deploy()
240
241     def check_deploy(self, cid):
242         """ Check, through the mail box in the parser, 
243         if the confirmation of the creation has been received
244
245         :param cid: the id of the original message
246         :type guid: string
247
248         """
249         uid = self._omf_api.check_mailbox("create", cid)
250         if uid : 
251             return uid
252         return False
253
254     def do_start(self):
255         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
256          to execute the application. 
257
258         """
259
260         if not self.get('env'):
261             self.set('env', " ")
262
263         if self.get('version') == "5":
264             # Some information to check the command for OMF5
265             msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
266                 self.get('appid') + " : " + self._path + " : " + \
267                 self._args + " : " + self.get('env')
268             self.debug(msg)
269
270             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
271                 self._args, self._path, self.get('env'))
272         else:
273             #For OMF 6
274             if self._start_cnt == 0:
275                 props = {}
276                 props['state'] = "running"
277     
278                 guards = {}
279                 guards['type'] = "application"
280                 guards['name'] = self.get('command')
281
282                 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
283
284             if self._start_cnt > confirmation_counter:
285                 msg = "Couldn't retrieve the confirmation that the application started"
286                 self.error(msg)
287                 raise RuntimeError, msg
288
289             res = self.check_start(self._topic_app)
290             if not res:
291                 self._start_cnt +=1
292                 self.ec.schedule(reschedule_check, self.start)
293                 return
294
295         super(OMFApplication, self).do_start()
296
297     def check_start(self, uid):
298         """ Check, through the mail box in the parser, 
299         if the confirmation of the start has been received
300
301         :param uid: the id of the original message
302         :type guid: string
303
304         """
305         res = self._omf_api.check_mailbox("started", uid)
306         if res : 
307             return True
308         return False
309
310     def do_stop(self):
311         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
312         kill the application. 
313         State is set to STOPPED after the message is sent.
314
315         """
316         if self.get('version') == 5:
317             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
318         super(OMFApplication, self).do_stop()
319
320     def check_release(self, cid):
321         """ Check, through the mail box in the parser, 
322         if the confirmation of the release has been received
323
324         :param cid: the id of the original message
325         :type guid: string
326
327         """
328         res = self._omf_api.check_mailbox("release", cid)
329         if res : 
330             return res
331         return False
332
333     def do_release(self):
334         """ Clean the RM at the end of the experiment and release the API.
335
336         """
337         if self._omf_api:
338             if self.get('version') == "6":
339                 if not self.release_id:
340                     self.release_id = os.urandom(16).encode('hex')
341                     self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
342     
343                 if self._release_cnt < confirmation_counter:
344                     cid = self.check_release(self.release_id)
345                     if not cid:
346                         self._release_cnt +=1
347                         self.ec.schedule(reschedule_check, self.release)
348                         return
349                 else:
350                     msg = "Couldn't retrieve the confirmation of the release"
351                     self.error(msg)
352
353
354             OMFAPIFactory.release_api(self.get('version'), 
355               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
356                self.get('xmppPassword'), exp_id = self.exp_id)
357
358         super(OMFApplication, self).do_release()
359