6af1d89d92052fb5012f547fdbdffdfa3c419983
[nepi.git] / src / nepi / resources / omf / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Julien Tribino <julien.tribino@inria.fr>
20
21 import os
22
23 from nepi.util.timefuncs import tnow
24 from nepi.execution.resource import ResourceManager, clsinit_copy, \
25         ResourceState, reschedule_delay
26 from nepi.execution.attribute import Attribute, Flags 
27 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
28 from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
29 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
30
31 from nepi.util import sshfuncs
32
33 @clsinit_copy
34 class OMFApplication(OMFResource):
35     """
36     .. class:: Class Args :
37       
38         :param ec: The Experiment controller
39         :type ec: ExperimentController
40         :param guid: guid of the RM
41         :type guid: int
42
43     """
44     _rtype = "OMFApplication"
45     _authorized_connections = ["OMFNode", "WilabtSfaNode"]
46
47     @classmethod
48     def _register_attributes(cls):
49         """ Register the attributes of an OMF application
50
51         """
52         command = Attribute("command", "Command to execute")
53         env = Attribute("env", "Environnement variable of the application")
54
55         # For OMF 5:
56         appid = Attribute("appid", "Name of the application")
57         stdin = Attribute("stdin", "Input of the application", default = "")
58         sources = Attribute("sources", "Sources of the application", 
59                      flags = Flags.Design)
60         sshuser = Attribute("sshUser", "user to connect with ssh", 
61                      flags = Flags.Design)
62         sshkey = Attribute("sshKey", "key to use for ssh", 
63                      flags = Flags.Design)
64
65         cls._register_attribute(appid)
66         cls._register_attribute(command)
67         cls._register_attribute(env)
68         cls._register_attribute(stdin)
69         cls._register_attribute(sources)
70         cls._register_attribute(sshuser)
71         cls._register_attribute(sshkey)
72
73     def __init__(self, ec, guid):
74         """
75         :param ec: The Experiment controller
76         :type ec: ExperimentController
77         :param guid: guid of the RM
78         :type guid: int
79         :param creds: Credentials to communicate with the rm (XmppClient for OMF)
80         :type creds: dict
81
82         """
83         super(OMFApplication, self).__init__(ec, guid)
84
85         self.set('command', "")
86         self.set('appid', "")
87         self._path= ""
88         self._args = ""
89         self.set('env', "")
90
91         self._node = None
92
93         self._omf_api = None
94         self._topic_app = None
95         self.create_id = None
96         self._create_cnt = 0
97         self._start_cnt = 0
98         self.release_id = None
99         self._release_cnt = 0
100
101         # For performance tests
102         self.begin_deploy_time = None
103         self.begin_start_time = None
104         self.begin_release_time = None
105         self.dperf = True
106         self.sperf = True
107         self.rperf = True
108
109         self.add_set_hook()
110
111     def _init_command(self):
112         comm = self.get('command').split(' ')
113         self._path= comm[0]
114         if len(comm)>1:
115             self._args = ' '.join(comm[1:])
116
117     @property
118     def exp_id(self):
119         return self.ec.exp_id
120
121     @property
122     def node(self):
123         rm_list = self.get_connected(OMFNode.get_rtype())
124         if rm_list: return rm_list[0]
125         return None
126
127     def stdin_hook(self, old_value, new_value):
128         """ Set a hook to the stdin attribute in order to send a message at each time
129         the value of this parameter is changed. Used ofr OMF 5.4 only
130
131         """
132         self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
133         return new_value
134
135     def add_set_hook(self):
136         """ Initialize the hooks for OMF 5.4 only
137
138         """
139         attr = self._attrs["stdin"]
140         attr.set_hook = self.stdin_hook
141
142     def valid_connection(self, guid):
143         """ Check if the connection with the guid in parameter is possible. 
144         Only meaningful connections are allowed.
145
146         :param guid: Guid of RM it will be connected
147         :type guid: int
148         :rtype:  Boolean
149
150         """
151         rm = self.ec.get_resource(guid)
152         if rm.get_rtype() not in self._authorized_connections:
153             msg = ("Connection between %s %s and %s %s refused: "
154                     "An Application can be connected only to a Node" ) % \
155                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
156             self.debug(msg)
157
158             return False
159
160         elif len(self.connections) != 0 :
161             msg = ("Connection between %s %s and %s %s refused: "
162                     "This Application is already connected" ) % \
163                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
164             self.debug(msg)
165
166             return False
167
168         else :
169             msg = "Connection between %s %s and %s %s accepted" % (
170                     self.get_rtype(), self._guid, rm.get_rtype(), guid)
171             self.debug(msg)
172
173             return True
174
175     def do_deploy(self):
176         """ Deploy the RM. It means nothing special for an application 
177         for now (later it will be upload sources, ...)
178         It becomes DEPLOYED after the topic for the application has been created
179
180         """
181         if not self.node or self.node.state < ResourceState.READY:
182             self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
183                        % self.node.state )
184             self.ec.schedule(reschedule_delay, self.deploy)
185             return
186
187         ## For performance test
188         if self.dperf:
189             self.begin_deploy_time = tnow()
190             self.dperf = False
191
192         self._init_command()
193
194         self.set('xmppUser',self.node.get('xmppUser'))
195         self.set('xmppServer',self.node.get('xmppServer'))
196         self.set('xmppPort',self.node.get('xmppPort'))
197         self.set('xmppPassword',self.node.get('xmppPassword'))
198         self.set('version',self.node.get('version'))
199
200         if not self.get('xmppServer'):
201             msg = "XmppServer is not initialzed. XMPP Connections impossible"
202             self.error(msg)
203             raise RuntimeError, msg
204
205         if not (self.get('xmppUser') or self.get('xmppPort') 
206                    or self.get('xmppPassword')):
207             msg = "Credentials are not all initialzed. Default values will be used"
208             self.warn(msg)
209
210         if not self.get('command') :
211             msg = "Application's Command is not initialized"
212             self.error(msg)
213             raise RuntimeError, msg
214
215         if not self._omf_api :
216             self._omf_api = OMFAPIFactory.get_api(self.get('version'), 
217               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
218                self.get('xmppPassword'), exp_id = self.exp_id)
219
220         if self.get('version') == "5":
221
222             self.begin_deploy_time = tnow()
223
224             if self.get('sources'):
225                 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
226                 user = self.get('sshUser') or self.get('xmppUser')
227                 dst = user + "@"+ gateway + ":"
228                 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
229         else :
230             # For OMF 6 :
231             if not self.create_id:
232                 props = {}
233                 if self.get('command'):
234                     props['application:binary_path'] = self.get('command')
235                     props['application:hrn'] = self.get('command')
236                     props['application:membership'] = self._topic_app
237                 props['application:type'] = "application"
238     
239                 self.create_id = os.urandom(16).encode('hex')
240                 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
241    
242             if self._create_cnt > confirmation_counter:
243                 msg = "Couldn't retrieve the confirmation of the creation"
244                 self.error(msg)
245                 raise RuntimeError, msg
246
247             uid = self.check_deploy(self.create_id)
248             if not uid:
249                 self._create_cnt +=1
250                 self.ec.schedule(reschedule_check, self.deploy)
251                 return
252
253             self._topic_app = uid
254             self._omf_api.enroll_topic(self._topic_app)
255
256         super(OMFApplication, self).do_deploy()
257
258     def check_deploy(self, cid):
259         """ Check, through the mail box in the parser, 
260         if the confirmation of the creation has been received
261
262         :param cid: the id of the original message
263         :type guid: string
264
265         """
266         uid = self._omf_api.check_mailbox("create", cid)
267         if uid : 
268             return uid
269         return False
270
271     def trace_filepath(self, filename):
272         return os.path.join('~/', filename)
273
274     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
275         self.info("Retrieving '%s' trace %s " % (name, attr))
276
277         path = self.trace_filepath(str(self.guid) + '_' + name)
278         
279         command = "(test -f %s && echo 'success') || echo 'error'" % path
280         (out, err), proc = self.node.execute(command)
281
282         if (err and proc.poll()) or out.find("error") != -1:
283             msg = " Couldn't find trace %s " % name
284             self.error(msg, out, err)
285             return None
286     
287         if attr == TraceAttr.PATH:
288             return path
289
290         if attr == TraceAttr.ALL:
291             (out, err), proc = self.node.check_output(self.run_home, name)
292             
293             if proc.poll():
294                 msg = " Couldn't read trace %s " % name
295                 self.error(msg, out, err)
296                 return None
297
298             return out
299
300         return out
301
302     def check_output(self, home, filename):
303         """ Retrives content of file """
304         (out, err), proc = self.execute("cat %s" % 
305             os.path.join(home, filename), retry = 1, with_lock = True)
306         return (out, err), proc
307
308
309     def do_start(self):
310         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
311          to execute the application. 
312
313         """
314         ## For performance test
315         if self.sperf:
316             self.begin_start_time = tnow()
317             self.sperf = False
318
319         if not self.get('env'):
320             self.set('env', " ")
321
322         if self.get('version') == "5":
323             self.begin_start_time = tnow()
324             # Some information to check the command for OMF5
325             msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
326                 self.get('appid') + " : " + self._path + " : " + \
327                 self._args + " : " + self.get('env')
328             self.debug(msg)
329
330             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
331                 self._args, self._path, self.get('env'))
332         else:
333             #For OMF 6
334             if self._start_cnt == 0:
335                 props = {}
336                 props['state'] = "running"
337     
338                 guards = {}
339                 guards['type'] = "application"
340                 guards['name'] = self.get('command')
341
342                 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
343
344             if self._start_cnt > confirmation_counter:
345                 msg = "Couldn't retrieve the confirmation that the application started"
346                 self.error(msg)
347                 raise RuntimeError, msg
348
349             res = self.check_start(self._topic_app)
350             if not res:
351                 self._start_cnt +=1
352                 self.ec.schedule(reschedule_check, self.start)
353                 return
354
355         super(OMFApplication, self).do_start()
356
357     def check_start(self, uid):
358         """ Check, through the mail box in the parser, 
359         if the confirmation of the start has been received
360
361         :param uid: the id of the original message
362         :type guid: string
363
364         """
365         res = self._omf_api.check_mailbox("started", uid)
366         if res : 
367             return True
368         return False
369
370     def do_stop(self):
371         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
372         kill the application. 
373         State is set to STOPPED after the message is sent.
374
375         """
376         if self.get('version') == 5:
377             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
378         super(OMFApplication, self).do_stop()
379
380     def check_release(self, cid):
381         """ Check, through the mail box in the parser, 
382         if the confirmation of the release has been received
383
384         :param cid: the id of the original message
385         :type guid: string
386
387         """
388         res = self._omf_api.check_mailbox("release", cid)
389         if res : 
390             return res
391         return False
392
393     def do_release(self):
394         """ Clean the RM at the end of the experiment and release the API.
395
396         """
397         ## For performance test
398         if self.rperf:
399             self.begin_release_time = tnow()
400             self.rperf = False
401
402         if self._omf_api:
403             if self.get('version') == "6" and self._topic_app:
404                 if not self.release_id:
405                     self.release_id = os.urandom(16).encode('hex')
406                     self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
407     
408                 if self._release_cnt < confirmation_counter:
409                     cid = self.check_release(self.release_id)
410                     if not cid:
411                         self._release_cnt +=1
412                         self.ec.schedule(reschedule_check, self.release)
413                         return
414                 else:
415                     msg = "Couldn't retrieve the confirmation of the release"
416                     self.error(msg)
417
418
419             OMFAPIFactory.release_api(self.get('version'), 
420               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
421                self.get('xmppPassword'), exp_id = self.exp_id)
422
423         super(OMFApplication, self).do_release()
424