eef5297c49eeb1e99012e0961a1cd868eeabb690
[nepi.git] / src / nepi / resources / omf / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 #         Julien Tribino <julien.tribino@inria.fr>
19
20 import os
21
22 from nepi.util.timefuncs import tnow
23 from nepi.execution.resource import ResourceManager, clsinit_copy, \
24         ResourceState
25 from nepi.execution.trace import Trace, TraceAttr
26 from nepi.execution.attribute import Attribute, Flags 
27 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
28 from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
29 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
30
31 from nepi.util import sshfuncs
32
33 @clsinit_copy
34 class OMFApplication(OMFResource):
35     """
36     .. class:: Class Args :
37       
38         :param ec: The Experiment controller
39         :type ec: ExperimentController
40         :param guid: guid of the RM
41         :type guid: int
42
43     """
44     _rtype = "omf::Application"
45     _authorized_connections = ["omf::Node", "wilabt::sfa::Node"]
46
47     @classmethod
48     def _register_attributes(cls):
49         """ Register the attributes of an OMF application
50
51         """
52         command = Attribute("command", "Command to execute")
53         env = Attribute("env", "Environnement variable of the application")
54
55         # For OMF 5:
56         appid = Attribute("appid", "Name of the application")
57         stdin = Attribute("stdin", "Input of the application", default = "")
58         sources = Attribute("sources", "Sources of the application", 
59                      flags = Flags.Design)
60         sshuser = Attribute("sshUser", "user to connect with ssh", 
61                      flags = Flags.Design)
62         sshkey = Attribute("sshKey", "key to use for ssh", 
63                      flags = Flags.Design)
64
65         cls._register_attribute(appid)
66         cls._register_attribute(command)
67         cls._register_attribute(env)
68         cls._register_attribute(stdin)
69         cls._register_attribute(sources)
70         cls._register_attribute(sshuser)
71         cls._register_attribute(sshkey)
72
73     def __init__(self, ec, guid):
74         """
75         :param ec: The Experiment controller
76         :type ec: ExperimentController
77         :param guid: guid of the RM
78         :type guid: int
79         :param creds: Credentials to communicate with the rm (XmppClient for OMF)
80         :type creds: dict
81
82         """
83         super(OMFApplication, self).__init__(ec, guid)
84
85         self.set('command', "")
86         self.set('appid', "")
87         self._path= ""
88         self._args = ""
89         self.set('env', "")
90
91         self._node = None
92
93         self._omf_api = None
94         self._topic_app = None
95         self.create_id = None
96         self._create_cnt = 0
97         self._start_cnt = 0
98         self.release_id = None
99         self._release_cnt = 0
100
101         # For performance tests
102         self.begin_deploy_time = None
103         self.begin_start_time = None
104         self.begin_release_time = None
105         self.dperf = True
106         self.sperf = True
107         self.rperf = True
108
109         self.add_set_hook()
110
111     def _init_command(self):
112         comm = self.get('command').split(' ')
113         self._path= comm[0]
114         if len(comm)>1:
115             self._args = ' '.join(comm[1:])
116
117     @property
118     def exp_id(self):
119         return self.ec.exp_id
120
121     @property
122     def node(self):
123         rm_list = self.get_connected(OMFNode.get_rtype())
124         if rm_list: return rm_list[0]
125         return None
126
127     def stdin_hook(self, old_value, new_value):
128         """ Set a hook to the stdin attribute in order to send a message at each time
129         the value of this parameter is changed. Used ofr OMF 5.4 only
130
131         """
132         self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
133         return new_value
134
135     def add_set_hook(self):
136         """ Initialize the hooks for OMF 5.4 only
137
138         """
139         attr = self._attrs["stdin"]
140         attr.set_hook = self.stdin_hook
141
142     def valid_connection(self, guid):
143         """ Check if the connection with the guid in parameter is possible. 
144         Only meaningful connections are allowed.
145
146         :param guid: Guid of RM it will be connected
147         :type guid: int
148         :rtype:  Boolean
149
150         """
151         rm = self.ec.get_resource(guid)
152         if rm.get_rtype() not in self._authorized_connections:
153             msg = ("Connection between %s %s and %s %s refused: "
154                     "An Application can be connected only to a Node" ) % \
155                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
156             self.debug(msg)
157
158             return False
159
160         elif len(self.connections) != 0 :
161             msg = ("Connection between %s %s and %s %s refused: "
162                     "This Application is already connected" ) % \
163                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
164             self.debug(msg)
165
166             return False
167
168         else :
169             msg = "Connection between %s %s and %s %s accepted" % (
170                     self.get_rtype(), self._guid, rm.get_rtype(), guid)
171             self.debug(msg)
172
173             return True
174
175     def do_deploy(self):
176         """ Deploy the RM. It means nothing special for an application 
177         for now (later it will be upload sources, ...)
178         It becomes DEPLOYED after the topic for the application has been created
179
180         """
181         if not self.node or self.node.state < ResourceState.READY:
182             self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
183                        % self.node.state )
184             self.ec.schedule(self.reschedule_delay, self.deploy)
185             return
186
187         ## For performance test
188         if self.dperf:
189             self.begin_deploy_time = tnow()
190             self.dperf = False
191
192         self._init_command()
193
194         self.set('xmppUser',self.node.get('xmppUser'))
195         self.set('xmppServer',self.node.get('xmppServer'))
196         self.set('xmppPort',self.node.get('xmppPort'))
197         self.set('xmppPassword',self.node.get('xmppPassword'))
198         self.set('version',self.node.get('version'))
199
200         if not self.get('xmppServer'):
201             msg = "XmppServer is not initialzed. XMPP Connections impossible"
202             self.error(msg)
203             raise RuntimeError, msg
204
205         if not (self.get('xmppUser') or self.get('xmppPort') 
206                    or self.get('xmppPassword')):
207             msg = "Credentials are not all initialzed. Default values will be used"
208             self.warn(msg)
209
210         if not self.get('command') :
211             msg = "Application's Command is not initialized"
212             self.error(msg)
213             raise RuntimeError, msg
214
215         if not self._omf_api :
216             self._omf_api = OMFAPIFactory.get_api(self.get('version'), 
217               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
218                self.get('xmppPassword'), exp_id = self.exp_id)
219
220         if self.get('version') == "5":
221
222             self.begin_deploy_time = tnow()
223
224             if self.get('sources'):
225                 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
226                 user = self.get('sshUser') or self.get('xmppUser')
227                 dst = user + "@"+ gateway + ":"
228                 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
229         else :
230             # For OMF 6 :
231             if not self.create_id:
232                 props = {}
233                 if self.get('command'):
234                     props['application:binary_path'] = self.get('command')
235                     props['application:hrn'] = self.get('command')
236                     props['application:membership'] = self._topic_app
237                 props['application:type'] = "application"
238     
239                 self.create_id = os.urandom(16).encode('hex')
240                 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
241    
242             if self._create_cnt > confirmation_counter:
243                 msg = "Couldn't retrieve the confirmation of the creation"
244                 self.error(msg)
245                 raise RuntimeError, msg
246
247             uid = self.check_deploy(self.create_id)
248             if not uid:
249                 self._create_cnt +=1
250                 self.ec.schedule(reschedule_check, self.deploy)
251                 return
252
253             self._topic_app = uid
254             self._omf_api.enroll_topic(self._topic_app)
255
256         super(OMFApplication, self).do_deploy()
257
258     def check_deploy(self, cid):
259         """ Check, through the mail box in the parser, 
260         if the confirmation of the creation has been received
261
262         :param cid: the id of the original message
263         :type guid: string
264
265         """
266         uid = self._omf_api.check_mailbox("create", cid)
267         if uid : 
268             return uid
269         return False
270
271     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
272         self.info("Retrieving '%s' trace %s " % (name, attr))
273         if name == 'stdout' :
274             suffix = '.out'
275         elif name == 'stderr' :
276             suffix = '.err'
277         else :
278             suffix = '.misc'
279
280         trace_path = '/tmp/'+ self._topic_app + suffix
281
282         if attr == TraceAttr.PATH:
283             return trace_path
284
285         if attr == TraceAttr.ALL:
286             try:
287                 f = open(trace_path ,'r')
288             except IOError:
289                 print "File with traces has not been found"
290                 return False
291             out = f.read()
292             f.close()
293         return out
294
295
296     def do_start(self):
297         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
298          to execute the application. 
299
300         """
301         ## For performance test
302         if self.sperf:
303             self.begin_start_time = tnow()
304             self.sperf = False
305
306         if not self.get('env'):
307             self.set('env', " ")
308
309         if self.get('version') == "5":
310             self.begin_start_time = tnow()
311             # Some information to check the command for OMF5
312             msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
313                 self.get('appid') + " : " + self._path + " : " + \
314                 self._args + " : " + self.get('env')
315             self.debug(msg)
316
317             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
318                 self._args, self._path, self.get('env'))
319         else:
320             #For OMF 6
321             if self._start_cnt == 0:
322                 props = {}
323                 props['state'] = "running"
324     
325                 guards = {}
326                 guards['type'] = "application"
327                 guards['name'] = self.get('command')
328
329                 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
330
331             if self._start_cnt > confirmation_counter:
332                 msg = "Couldn't retrieve the confirmation that the application started"
333                 self.error(msg)
334                 raise RuntimeError, msg
335
336             res = self.check_start(self._topic_app)
337             if not res:
338                 self._start_cnt +=1
339                 self.ec.schedule(reschedule_check, self.start)
340                 return
341
342         super(OMFApplication, self).do_start()
343
344     def check_start(self, uid):
345         """ Check, through the mail box in the parser, 
346         if the confirmation of the start has been received
347
348         :param uid: the id of the original message
349         :type guid: string
350
351         """
352         res = self._omf_api.check_mailbox("started", uid)
353         if res : 
354             return True
355         return False
356
357     def do_stop(self):
358         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
359         kill the application. 
360         State is set to STOPPED after the message is sent.
361
362         """
363
364
365         if self.get('version') == 5:
366             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
367         super(OMFApplication, self).do_stop()
368
369     def check_release(self, cid):
370         """ Check, through the mail box in the parser, 
371         if the confirmation of the release has been received
372
373         :param cid: the id of the original message
374         :type guid: string
375
376         """
377         res = self._omf_api.check_mailbox("release", cid)
378         if res : 
379             return res
380         return False
381
382     def do_release(self):
383         """ Clean the RM at the end of the experiment and release the API.
384
385         """
386         ## For performance test
387         if self.rperf:
388             self.begin_release_time = tnow()
389             self.rperf = False
390
391         if self._omf_api:
392             if self.get('version') == "6" and self._topic_app:
393                 if not self.release_id:
394                     self.release_id = os.urandom(16).encode('hex')
395                     self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
396     
397                 if self._release_cnt < confirmation_counter:
398                     cid = self.check_release(self.release_id)
399                     if not cid:
400                         self._release_cnt +=1
401                         self.ec.schedule(reschedule_check, self.release)
402                         return
403                 else:
404                     msg = "Couldn't retrieve the confirmation of the release"
405                     self.error(msg)
406
407                 # Remove the stdout and stderr of the application
408                 try:
409                     os.remove('/tmp/'+self._topic_app +'.out')
410                     os.remove('/tmp/'+self._topic_app +'.err')
411                 except OSError:
412                     pass
413
414             OMFAPIFactory.release_api(self.get('version'), 
415               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
416                self.get('xmppPassword'), exp_id = self.exp_id)
417
418         super(OMFApplication, self).do_release()
419