7a87d2a01ea8548a49f6e50b6e74e2746fbc6dbc
[nepi.git] / src / nepi / resources / omf / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Julien Tribino <julien.tribino@inria.fr>
20
21 import os
22
23 from nepi.util.timefuncs import tnow
24 from nepi.execution.resource import ResourceManager, clsinit_copy, \
25         ResourceState, reschedule_delay
26 from nepi.execution.trace import Trace, TraceAttr
27 from nepi.execution.attribute import Attribute, Flags 
28 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
29 from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
30 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
31
32 from nepi.util import sshfuncs
33
34 @clsinit_copy
35 class OMFApplication(OMFResource):
36     """
37     .. class:: Class Args :
38       
39         :param ec: The Experiment controller
40         :type ec: ExperimentController
41         :param guid: guid of the RM
42         :type guid: int
43
44     """
45     _rtype = "OMFApplication"
46     _authorized_connections = ["OMFNode", "WilabtSfaNode"]
47
48     @classmethod
49     def _register_attributes(cls):
50         """ Register the attributes of an OMF application
51
52         """
53         command = Attribute("command", "Command to execute")
54         env = Attribute("env", "Environnement variable of the application")
55
56         # For OMF 5:
57         appid = Attribute("appid", "Name of the application")
58         stdin = Attribute("stdin", "Input of the application", default = "")
59         sources = Attribute("sources", "Sources of the application", 
60                      flags = Flags.Design)
61         sshuser = Attribute("sshUser", "user to connect with ssh", 
62                      flags = Flags.Design)
63         sshkey = Attribute("sshKey", "key to use for ssh", 
64                      flags = Flags.Design)
65
66         cls._register_attribute(appid)
67         cls._register_attribute(command)
68         cls._register_attribute(env)
69         cls._register_attribute(stdin)
70         cls._register_attribute(sources)
71         cls._register_attribute(sshuser)
72         cls._register_attribute(sshkey)
73
74     def __init__(self, ec, guid):
75         """
76         :param ec: The Experiment controller
77         :type ec: ExperimentController
78         :param guid: guid of the RM
79         :type guid: int
80         :param creds: Credentials to communicate with the rm (XmppClient for OMF)
81         :type creds: dict
82
83         """
84         super(OMFApplication, self).__init__(ec, guid)
85
86         self.set('command', "")
87         self.set('appid', "")
88         self._path= ""
89         self._args = ""
90         self.set('env', "")
91
92         self._node = None
93
94         self._omf_api = None
95         self._topic_app = None
96         self.create_id = None
97         self._create_cnt = 0
98         self._start_cnt = 0
99         self.release_id = None
100         self._release_cnt = 0
101
102         # For performance tests
103         self.begin_deploy_time = None
104         self.begin_start_time = None
105         self.begin_release_time = None
106         self.dperf = True
107         self.sperf = True
108         self.rperf = True
109
110         self.add_set_hook()
111
112     def _init_command(self):
113         comm = self.get('command').split(' ')
114         self._path= comm[0]
115         if len(comm)>1:
116             self._args = ' '.join(comm[1:])
117
118     @property
119     def exp_id(self):
120         return self.ec.exp_id
121
122     @property
123     def node(self):
124         rm_list = self.get_connected(OMFNode.get_rtype())
125         if rm_list: return rm_list[0]
126         return None
127
128     def stdin_hook(self, old_value, new_value):
129         """ Set a hook to the stdin attribute in order to send a message at each time
130         the value of this parameter is changed. Used ofr OMF 5.4 only
131
132         """
133         self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
134         return new_value
135
136     def add_set_hook(self):
137         """ Initialize the hooks for OMF 5.4 only
138
139         """
140         attr = self._attrs["stdin"]
141         attr.set_hook = self.stdin_hook
142
143     def valid_connection(self, guid):
144         """ Check if the connection with the guid in parameter is possible. 
145         Only meaningful connections are allowed.
146
147         :param guid: Guid of RM it will be connected
148         :type guid: int
149         :rtype:  Boolean
150
151         """
152         rm = self.ec.get_resource(guid)
153         if rm.get_rtype() not in self._authorized_connections:
154             msg = ("Connection between %s %s and %s %s refused: "
155                     "An Application can be connected only to a Node" ) % \
156                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
157             self.debug(msg)
158
159             return False
160
161         elif len(self.connections) != 0 :
162             msg = ("Connection between %s %s and %s %s refused: "
163                     "This Application is already connected" ) % \
164                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
165             self.debug(msg)
166
167             return False
168
169         else :
170             msg = "Connection between %s %s and %s %s accepted" % (
171                     self.get_rtype(), self._guid, rm.get_rtype(), guid)
172             self.debug(msg)
173
174             return True
175
176     def do_deploy(self):
177         """ Deploy the RM. It means nothing special for an application 
178         for now (later it will be upload sources, ...)
179         It becomes DEPLOYED after the topic for the application has been created
180
181         """
182         if not self.node or self.node.state < ResourceState.READY:
183             self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
184                        % self.node.state )
185             self.ec.schedule(reschedule_delay, self.deploy)
186             return
187
188         ## For performance test
189         if self.dperf:
190             self.begin_deploy_time = tnow()
191             self.dperf = False
192
193         self._init_command()
194
195         self.set('xmppUser',self.node.get('xmppUser'))
196         self.set('xmppServer',self.node.get('xmppServer'))
197         self.set('xmppPort',self.node.get('xmppPort'))
198         self.set('xmppPassword',self.node.get('xmppPassword'))
199         self.set('version',self.node.get('version'))
200
201         if not self.get('xmppServer'):
202             msg = "XmppServer is not initialzed. XMPP Connections impossible"
203             self.error(msg)
204             raise RuntimeError, msg
205
206         if not (self.get('xmppUser') or self.get('xmppPort') 
207                    or self.get('xmppPassword')):
208             msg = "Credentials are not all initialzed. Default values will be used"
209             self.warn(msg)
210
211         if not self.get('command') :
212             msg = "Application's Command is not initialized"
213             self.error(msg)
214             raise RuntimeError, msg
215
216         if not self._omf_api :
217             self._omf_api = OMFAPIFactory.get_api(self.get('version'), 
218               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
219                self.get('xmppPassword'), exp_id = self.exp_id)
220
221         if self.get('version') == "5":
222
223             self.begin_deploy_time = tnow()
224
225             if self.get('sources'):
226                 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
227                 user = self.get('sshUser') or self.get('xmppUser')
228                 dst = user + "@"+ gateway + ":"
229                 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
230         else :
231             # For OMF 6 :
232             if not self.create_id:
233                 props = {}
234                 if self.get('command'):
235                     props['application:binary_path'] = self.get('command')
236                     props['application:hrn'] = self.get('command')
237                     props['application:membership'] = self._topic_app
238                 props['application:type'] = "application"
239     
240                 self.create_id = os.urandom(16).encode('hex')
241                 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
242    
243             if self._create_cnt > confirmation_counter:
244                 msg = "Couldn't retrieve the confirmation of the creation"
245                 self.error(msg)
246                 raise RuntimeError, msg
247
248             uid = self.check_deploy(self.create_id)
249             if not uid:
250                 self._create_cnt +=1
251                 self.ec.schedule(reschedule_check, self.deploy)
252                 return
253
254             self._topic_app = uid
255             self._omf_api.enroll_topic(self._topic_app)
256
257         super(OMFApplication, self).do_deploy()
258
259     def check_deploy(self, cid):
260         """ Check, through the mail box in the parser, 
261         if the confirmation of the creation has been received
262
263         :param cid: the id of the original message
264         :type guid: string
265
266         """
267         uid = self._omf_api.check_mailbox("create", cid)
268         if uid : 
269             return uid
270         return False
271
272     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
273         self.info("Retrieving '%s' trace %s " % (name, attr))
274         if name == 'stdout' :
275             suffix = '.out'
276         elif name == 'stderr' :
277             suffix = '.err'
278         else :
279             suffix = '.misc'
280
281         trace_path = '/tmp/'+ self._topic_app + suffix
282
283         if attr == TraceAttr.PATH:
284             return trace_path
285
286         if attr == TraceAttr.ALL:
287             try:
288                 f = open(trace_path ,'r')
289             except IOError:
290                 print "File with traces has not been found"
291                 return False
292             out = f.read()
293             f.close()
294         return out
295
296
297     def do_start(self):
298         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
299          to execute the application. 
300
301         """
302         ## For performance test
303         if self.sperf:
304             self.begin_start_time = tnow()
305             self.sperf = False
306
307         if not self.get('env'):
308             self.set('env', " ")
309
310         if self.get('version') == "5":
311             self.begin_start_time = tnow()
312             # Some information to check the command for OMF5
313             msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
314                 self.get('appid') + " : " + self._path + " : " + \
315                 self._args + " : " + self.get('env')
316             self.debug(msg)
317
318             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
319                 self._args, self._path, self.get('env'))
320         else:
321             #For OMF 6
322             if self._start_cnt == 0:
323                 props = {}
324                 props['state'] = "running"
325     
326                 guards = {}
327                 guards['type'] = "application"
328                 guards['name'] = self.get('command')
329
330                 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
331
332             if self._start_cnt > confirmation_counter:
333                 msg = "Couldn't retrieve the confirmation that the application started"
334                 self.error(msg)
335                 raise RuntimeError, msg
336
337             res = self.check_start(self._topic_app)
338             if not res:
339                 self._start_cnt +=1
340                 self.ec.schedule(reschedule_check, self.start)
341                 return
342
343         super(OMFApplication, self).do_start()
344
345     def check_start(self, uid):
346         """ Check, through the mail box in the parser, 
347         if the confirmation of the start has been received
348
349         :param uid: the id of the original message
350         :type guid: string
351
352         """
353         res = self._omf_api.check_mailbox("started", uid)
354         if res : 
355             return True
356         return False
357
358     def do_stop(self):
359         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
360         kill the application. 
361         State is set to STOPPED after the message is sent.
362
363         """
364
365
366         if self.get('version') == 5:
367             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
368         super(OMFApplication, self).do_stop()
369
370     def check_release(self, cid):
371         """ Check, through the mail box in the parser, 
372         if the confirmation of the release has been received
373
374         :param cid: the id of the original message
375         :type guid: string
376
377         """
378         res = self._omf_api.check_mailbox("release", cid)
379         if res : 
380             return res
381         return False
382
383     def do_release(self):
384         """ Clean the RM at the end of the experiment and release the API.
385
386         """
387         ## For performance test
388         if self.rperf:
389             self.begin_release_time = tnow()
390             self.rperf = False
391
392         if self._omf_api:
393             if self.get('version') == "6" and self._topic_app:
394                 if not self.release_id:
395                     self.release_id = os.urandom(16).encode('hex')
396                     self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
397     
398                 if self._release_cnt < confirmation_counter:
399                     cid = self.check_release(self.release_id)
400                     if not cid:
401                         self._release_cnt +=1
402                         self.ec.schedule(reschedule_check, self.release)
403                         return
404                 else:
405                     msg = "Couldn't retrieve the confirmation of the release"
406                     self.error(msg)
407
408                 # Remove the stdout and stderr of the application
409                 try:
410                     os.remove('/tmp/'+self._topic_app +'.out')
411                     os.remove('/tmp/'+self._topic_app +'.err')
412                 except OSError:
413                     pass
414
415             OMFAPIFactory.release_api(self.get('version'), 
416               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
417                self.get('xmppPassword'), exp_id = self.exp_id)
418
419         super(OMFApplication, self).do_release()
420