b66d2a503215e4af6859ae9b0704f205af1b8c59
[nepi.git] / src / nepi / resources / omf / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Julien Tribino <julien.tribino@inria.fr>
20
21 import os
22
23 from nepi.execution.resource import ResourceManager, clsinit_copy, \
24         ResourceState, reschedule_delay
25 from nepi.execution.attribute import Attribute, Flags 
26 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
27 from nepi.resources.omf.node import OMFNode, confirmation_counter
28 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
29
30 from nepi.util import sshfuncs
31
32 @clsinit_copy
33 class OMFApplication(OMFResource):
34     """
35     .. class:: Class Args :
36       
37         :param ec: The Experiment controller
38         :type ec: ExperimentController
39         :param guid: guid of the RM
40         :type guid: int
41
42     """
43     _rtype = "OMFApplication"
44     _authorized_connections = ["OMFNode"]
45
46     @classmethod
47     def _register_attributes(cls):
48         """ Register the attributes of an OMF application
49
50         """
51         command = Attribute("command", "Command to execute")
52         env = Attribute("env", "Environnement variable of the application")
53
54         # For OMF 5:
55         appid = Attribute("appid", "Name of the application")
56         stdin = Attribute("stdin", "Input of the application", default = "")
57         sources = Attribute("sources", "Sources of the application", 
58                      flags = Flags.ExecReadOnly)
59         sshuser = Attribute("sshUser", "user to connect with ssh", 
60                      flags = Flags.ExecReadOnly)
61         sshkey = Attribute("sshKey", "key to use for ssh", 
62                      flags = Flags.ExecReadOnly)
63
64         cls._register_attribute(appid)
65         cls._register_attribute(command)
66         cls._register_attribute(env)
67         cls._register_attribute(stdin)
68         cls._register_attribute(sources)
69         cls._register_attribute(sshuser)
70         cls._register_attribute(sshkey)
71
72     def __init__(self, ec, guid):
73         """
74         :param ec: The Experiment controller
75         :type ec: ExperimentController
76         :param guid: guid of the RM
77         :type guid: int
78         :param creds: Credentials to communicate with the rm (XmppClient for OMF)
79         :type creds: dict
80
81         """
82         super(OMFApplication, self).__init__(ec, guid)
83
84         self.set('command', "")
85         self.set('appid', "")
86         self.path= ""
87         self.args = ""
88         self.set('env', "")
89
90         self._node = None
91
92         self._omf_api = None
93         self._topic_app = None
94         self.create_id = None
95         self._create_cnt = 0
96         self._start_cnt = 0
97         self.release_id = None
98         self._release_cnt = 0
99
100         self.add_set_hook()
101
102     def _init_command(self):
103         comm = self.get('command').split(' ')
104         self.path= comm[0]
105         if len(comm)>1:
106             self.args = ' '.join(comm[1:])
107
108     @property
109     def exp_id(self):
110         return self.ec.exp_id
111
112     @property
113     def node(self):
114         rm_list = self.get_connected(OMFNode.get_rtype())
115         if rm_list: return rm_list[0]
116         return None
117
118     def stdin_hook(self, old_value, new_value):
119         """ Set a hook to the stdin attribute in order to send a message at each time
120         the value of this parameter is changed
121
122         """
123         self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
124         return new_value
125
126     def add_set_hook(self):
127         """ Initialize the hooks
128
129         """
130         attr = self._attrs["stdin"]
131         attr.set_hook = self.stdin_hook
132
133     def valid_connection(self, guid):
134         """ Check if the connection with the guid in parameter is possible. 
135         Only meaningful connections are allowed.
136
137         :param guid: Guid of RM it will be connected
138         :type guid: int
139         :rtype:  Boolean
140
141         """
142         rm = self.ec.get_resource(guid)
143         if rm.get_rtype() not in self._authorized_connections:
144             msg = ("Connection between %s %s and %s %s refused: "
145                     "An Application can be connected only to a Node" ) % \
146                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
147             self.debug(msg)
148
149             return False
150
151         elif len(self.connections) != 0 :
152             msg = ("Connection between %s %s and %s %s refused: "
153                     "This Application is already connected" ) % \
154                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
155             self.debug(msg)
156
157             return False
158
159         else :
160             msg = "Connection between %s %s and %s %s accepted" % (
161                     self.get_rtype(), self._guid, rm.get_rtype(), guid)
162             self.debug(msg)
163
164             return True
165
166     def do_deploy(self):
167         """ Deploy the RM. It means nothing special for an application 
168         for now (later it will be upload sources, ...)
169         It becomes DEPLOYED after getting the xmpp client.
170
171         """
172         if not self.node or self.node.state < ResourceState.READY:
173             self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
174                        % self.node.state )
175             self.ec.schedule(reschedule_delay, self.deploy)
176             return
177
178         self._init_command()
179
180         self.set('xmppUser',self.node.get('xmppUser'))
181         self.set('xmppServer',self.node.get('xmppServer'))
182         self.set('xmppPort',self.node.get('xmppPort'))
183         self.set('xmppPassword',self.node.get('xmppPassword'))
184         self.set('version',self.node.get('version'))
185
186         if not self.get('xmppServer'):
187             msg = "XmppServer is not initialzed. XMPP Connections impossible"
188             self.error(msg)
189             raise RuntimeError, msg
190
191         if not (self.get('xmppUser') or self.get('xmppPort') 
192                    or self.get('xmppPassword')):
193             msg = "Credentials are not all initialzed. Default values will be used"
194             self.warn(msg)
195
196         if not self._omf_api :
197             self._omf_api = OMFAPIFactory.get_api(self.get('version'), 
198               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
199                self.get('xmppPassword'), exp_id = self.exp_id)
200
201         if self.get('version') == "5":
202             if self.get('sources'):
203                 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
204                 user = self.get('sshUser') or self.get('xmppUser')
205                 dst = user + "@"+ gateway + ":"
206                 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
207         else :
208             # For OMF 6 :
209             if not self.create_id:
210                 props = {}
211                 if self.get('command'):
212                     props['application:binary_path'] = self.get('command')
213                     props['application:hrn'] = self.get('command')
214                     props['application:membership'] = self._topic_app
215                 props['application:type'] = "application"
216     
217                 self.create_id = os.urandom(16).encode('hex')
218                 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
219    
220             if self._create_cnt > confirmation_counter:
221                 msg = "Couldn't retrieve the confirmation of the creation"
222                 self.error(msg)
223                 raise RuntimeError, msg
224
225             uid = self.check_deploy(self.create_id)
226             if not uid:
227                 self._create_cnt +=1
228                 self.ec.schedule(reschedule_delay, self.deploy)
229                 return
230
231             self._topic_app = uid
232             self._omf_api.enroll_topic(self._topic_app)
233
234         super(OMFApplication, self).do_deploy()
235
236     def check_deploy(self, cid):
237         uid = self._omf_api.check_mailbox("create", cid)
238         if uid : 
239             return uid
240         return False
241
242     def do_start(self):
243         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
244          to execute the application. 
245          It becomes STARTED before the messages are sent (for coordination)
246
247         """
248         if not self.get('command') :
249             msg = "Application's Command is not initialized"
250             self.error(msg)
251             raise RuntimeError, msg
252
253         if not self.get('env'):
254             self.set('env', " ")
255
256         if self.get('version') == "5":
257             # Some information to check the command for OMF5
258             msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
259                 self.get('appid') + " : " + self.path + " : " + \
260                 self.args + " : " + self.get('env')
261             self.debug(msg)
262
263             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
264                 self.get('args'), self.get('path'), self.get('env'))
265         else:
266             #For OMF 6
267             if self._start_cnt == 0:
268                 props = {}
269                 props['state'] = "running"
270     
271                 guards = {}
272                 guards['type'] = "application"
273                 guards['name'] = self.get('command')
274
275                 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
276
277             if self._start_cnt > confirmation_counter:
278                 msg = "Couldn't retrieve the confirmation that the application started"
279                 self.error(msg)
280                 raise RuntimeError, msg
281
282             res = self.check_start(self._topic_app)
283             if not res:
284                 self._start_cnt +=1
285                 self.ec.schedule(reschedule_delay, self.start)
286                 return
287
288         super(OMFApplication, self).do_start()
289
290     def check_start(self, uid):
291         res = self._omf_api.check_mailbox("started", uid)
292         if res : 
293             return True
294         return False
295
296     def do_stop(self):
297         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
298         kill the application. 
299         State is set to STOPPED after the message is sent.
300
301         """
302         if self.get('version') == 5:
303             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
304         super(OMFApplication, self).do_stop()
305
306     def check_release(self, cid):
307         res = self._omf_api.check_mailbox("release", cid)
308         if res : 
309             return res
310         return False
311
312     def do_release(self):
313         """ Clean the RM at the end of the experiment and release the API.
314
315         """
316         if self.get('version') == "6":
317             if not self.release_id:
318                 self.release_id = os.urandom(16).encode('hex')
319                 self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
320     
321             if self._release_cnt < confirmation_counter:
322                 cid = self.check_release(self.release_id)
323                 if not cid:
324                     self._release_cnt +=1
325                     self.ec.schedule(reschedule_delay, self.release)
326                     return
327             else:
328                 msg = "Couldn't retrieve the confirmation of the release"
329                 self.error(msg)
330
331         if self._omf_api:
332             OMFAPIFactory.release_api(self.get('version'), 
333               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
334                self.get('xmppPassword'), exp_id = self.exp_id)
335
336         super(OMFApplication, self).do_release()
337