applied the except and raise fixers to the master branch to close the gap with py3
[nepi.git] / src / nepi / resources / omf / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 #         Julien Tribino <julien.tribino@inria.fr>
19
20 from __future__ import print_function
21
22 import os
23
24 from nepi.util.timefuncs import tnow
25 from nepi.execution.resource import ResourceManager, clsinit_copy, \
26         ResourceState
27 from nepi.execution.trace import Trace, TraceAttr
28 from nepi.execution.attribute import Attribute, Flags 
29 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
30 from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
31 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
32
33 from nepi.util import sshfuncs
34
35 @clsinit_copy
36 class OMFApplication(OMFResource):
37     """
38     .. class:: Class Args :
39       
40         :param ec: The Experiment controller
41         :type ec: ExperimentController
42         :param guid: guid of the RM
43         :type guid: int
44
45     """
46     _rtype = "omf::Application"
47     _authorized_connections = ["omf::Node", "wilabt::sfa::Node"]
48
49     @classmethod
50     def _register_attributes(cls):
51         """ Register the attributes of an OMF application
52
53         """
54         command = Attribute("command", "Command to execute")
55         env = Attribute("env", "Environnement variable of the application")
56
57         # For OMF 5:
58         appid = Attribute("appid", "Name of the application")
59         stdin = Attribute("stdin", "Input of the application", default = "")
60         sources = Attribute("sources", "Sources of the application", 
61                      flags = Flags.Design)
62         sshuser = Attribute("sshUser", "user to connect with ssh", 
63                      flags = Flags.Design)
64         sshkey = Attribute("sshKey", "key to use for ssh", 
65                      flags = Flags.Design)
66
67         cls._register_attribute(appid)
68         cls._register_attribute(command)
69         cls._register_attribute(env)
70         cls._register_attribute(stdin)
71         cls._register_attribute(sources)
72         cls._register_attribute(sshuser)
73         cls._register_attribute(sshkey)
74
75     def __init__(self, ec, guid):
76         """
77         :param ec: The Experiment controller
78         :type ec: ExperimentController
79         :param guid: guid of the RM
80         :type guid: int
81         :param creds: Credentials to communicate with the rm (XmppClient for OMF)
82         :type creds: dict
83
84         """
85         super(OMFApplication, self).__init__(ec, guid)
86
87         self.set('command', "")
88         self.set('appid', "")
89         self._path= ""
90         self._args = ""
91         self.set('env', "")
92
93         self._node = None
94
95         self._omf_api = None
96         self._topic_app = None
97         self.create_id = None
98         self._create_cnt = 0
99         self._start_cnt = 0
100         self.release_id = None
101         self._release_cnt = 0
102
103         # For performance tests
104         self.begin_deploy_time = None
105         self.begin_start_time = None
106         self.begin_release_time = None
107         self.dperf = True
108         self.sperf = True
109         self.rperf = True
110
111         self.add_set_hook()
112
113     def _init_command(self):
114         comm = self.get('command').split(' ')
115         self._path= comm[0]
116         if len(comm)>1:
117             self._args = ' '.join(comm[1:])
118
119     @property
120     def exp_id(self):
121         return self.ec.exp_id
122
123     @property
124     def node(self):
125         rm_list = self.get_connected(OMFNode.get_rtype())
126         if rm_list: return rm_list[0]
127         return None
128
129     def stdin_hook(self, old_value, new_value):
130         """ Set a hook to the stdin attribute in order to send a message at each time
131         the value of this parameter is changed. Used ofr OMF 5.4 only
132
133         """
134         self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
135         return new_value
136
137     def add_set_hook(self):
138         """ Initialize the hooks for OMF 5.4 only
139
140         """
141         attr = self._attrs["stdin"]
142         attr.set_hook = self.stdin_hook
143
144     def valid_connection(self, guid):
145         """ Check if the connection with the guid in parameter is possible. 
146         Only meaningful connections are allowed.
147
148         :param guid: Guid of RM it will be connected
149         :type guid: int
150         :rtype:  Boolean
151
152         """
153         rm = self.ec.get_resource(guid)
154         if rm.get_rtype() not in self._authorized_connections:
155             msg = ("Connection between %s %s and %s %s refused: "
156                     "An Application can be connected only to a Node" ) % \
157                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
158             self.debug(msg)
159
160             return False
161
162         elif len(self.connections) != 0 :
163             msg = ("Connection between %s %s and %s %s refused: "
164                     "This Application is already connected" ) % \
165                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
166             self.debug(msg)
167
168             return False
169
170         else :
171             msg = "Connection between %s %s and %s %s accepted" % (
172                     self.get_rtype(), self._guid, rm.get_rtype(), guid)
173             self.debug(msg)
174
175             return True
176
177     def do_deploy(self):
178         """ Deploy the RM. It means nothing special for an application 
179         for now (later it will be upload sources, ...)
180         It becomes DEPLOYED after the topic for the application has been created
181
182         """
183         if not self.node or self.node.state < ResourceState.READY:
184             self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
185                        % self.node.state )
186             self.ec.schedule(self.reschedule_delay, self.deploy)
187             return
188
189         ## For performance test
190         if self.dperf:
191             self.begin_deploy_time = tnow()
192             self.dperf = False
193
194         self._init_command()
195
196         self.set('xmppUser',self.node.get('xmppUser'))
197         self.set('xmppServer',self.node.get('xmppServer'))
198         self.set('xmppPort',self.node.get('xmppPort'))
199         self.set('xmppPassword',self.node.get('xmppPassword'))
200         self.set('version',self.node.get('version'))
201
202         if not self.get('xmppServer'):
203             msg = "XmppServer is not initialzed. XMPP Connections impossible"
204             self.error(msg)
205             raise RuntimeError(msg)
206
207         if not (self.get('xmppUser') or self.get('xmppPort') 
208                    or self.get('xmppPassword')):
209             msg = "Credentials are not all initialzed. Default values will be used"
210             self.warn(msg)
211
212         if not self.get('command') :
213             msg = "Application's Command is not initialized"
214             self.error(msg)
215             raise RuntimeError(msg)
216
217         if not self._omf_api :
218             self._omf_api = OMFAPIFactory.get_api(self.get('version'), 
219               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
220                self.get('xmppPassword'), exp_id = self.exp_id)
221
222         if self.get('version') == "5":
223
224             self.begin_deploy_time = tnow()
225
226             if self.get('sources'):
227                 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
228                 user = self.get('sshUser') or self.get('xmppUser')
229                 dst = user + "@"+ gateway + ":"
230                 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
231         else :
232             # For OMF 6 :
233             if not self.create_id:
234                 props = {}
235                 if self.get('command'):
236                     props['application:binary_path'] = self.get('command')
237                     props['application:hrn'] = self.get('command')
238                     props['application:membership'] = self._topic_app
239                 props['application:type'] = "application"
240     
241                 self.create_id = os.urandom(16).encode('hex')
242                 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
243    
244             if self._create_cnt > confirmation_counter:
245                 msg = "Couldn't retrieve the confirmation of the creation"
246                 self.error(msg)
247                 raise RuntimeError(msg)
248
249             uid = self.check_deploy(self.create_id)
250             if not uid:
251                 self._create_cnt +=1
252                 self.ec.schedule(reschedule_check, self.deploy)
253                 return
254
255             self._topic_app = uid
256             self._omf_api.enroll_topic(self._topic_app)
257
258         super(OMFApplication, self).do_deploy()
259
260     def check_deploy(self, cid):
261         """ Check, through the mail box in the parser, 
262         if the confirmation of the creation has been received
263
264         :param cid: the id of the original message
265         :type guid: string
266
267         """
268         uid = self._omf_api.check_mailbox("create", cid)
269         if uid : 
270             return uid
271         return False
272
273     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
274         self.info("Retrieving '%s' trace %s " % (name, attr))
275         if name == 'stdout' :
276             suffix = '.out'
277         elif name == 'stderr' :
278             suffix = '.err'
279         else :
280             suffix = '.misc'
281
282         trace_path = '/tmp/'+ self._topic_app + suffix
283
284         if attr == TraceAttr.PATH:
285             return trace_path
286
287         if attr == TraceAttr.ALL:
288             try:
289                 with open(trace_path ,'r') as f:
290                     return f.read()
291             except IOError:
292                 print("File with traces has not been found")
293                 return False
294
295
296     def do_start(self):
297         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
298          to execute the application. 
299
300         """
301         ## For performance test
302         if self.sperf:
303             self.begin_start_time = tnow()
304             self.sperf = False
305
306         if not self.get('env'):
307             self.set('env', " ")
308
309         if self.get('version') == "5":
310             self.begin_start_time = tnow()
311             # Some information to check the command for OMF5
312             msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
313                 self.get('appid') + " : " + self._path + " : " + \
314                 self._args + " : " + self.get('env')
315             self.debug(msg)
316
317             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
318                 self._args, self._path, self.get('env'))
319         else:
320             #For OMF 6
321             if self._start_cnt == 0:
322                 props = {}
323                 props['state'] = "running"
324     
325                 guards = {}
326                 guards['type'] = "application"
327                 guards['name'] = self.get('command')
328
329                 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
330
331             if self._start_cnt > confirmation_counter:
332                 msg = "Couldn't retrieve the confirmation that the application started"
333                 self.error(msg)
334                 raise RuntimeError(msg)
335
336             res = self.check_start(self._topic_app)
337             if not res:
338                 self._start_cnt +=1
339                 self.ec.schedule(reschedule_check, self.start)
340                 return
341
342         super(OMFApplication, self).do_start()
343
344     def check_start(self, uid):
345         """ Check, through the mail box in the parser, 
346         if the confirmation of the start has been received
347
348         :param uid: the id of the original message
349         :type guid: string
350
351         """
352         res = self._omf_api.check_mailbox("started", uid)
353         if res : 
354             return True
355         return False
356
357     def do_stop(self):
358         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
359         kill the application. 
360         State is set to STOPPED after the message is sent.
361
362         """
363
364
365         if self.get('version') == 5:
366             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
367         super(OMFApplication, self).do_stop()
368
369     def check_release(self, cid):
370         """ Check, through the mail box in the parser, 
371         if the confirmation of the release has been received
372
373         :param cid: the id of the original message
374         :type guid: string
375
376         """
377         res = self._omf_api.check_mailbox("release", cid)
378         if res : 
379             return res
380         return False
381
382     def do_release(self):
383         """ Clean the RM at the end of the experiment and release the API.
384
385         """
386         ## For performance test
387         if self.rperf:
388             self.begin_release_time = tnow()
389             self.rperf = False
390
391         if self._omf_api:
392             if self.get('version') == "6" and self._topic_app:
393                 if not self.release_id:
394                     self.release_id = os.urandom(16).encode('hex')
395                     self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
396     
397                 if self._release_cnt < confirmation_counter:
398                     cid = self.check_release(self.release_id)
399                     if not cid:
400                         self._release_cnt +=1
401                         self.ec.schedule(reschedule_check, self.release)
402                         return
403                 else:
404                     msg = "Couldn't retrieve the confirmation of the release"
405                     self.error(msg)
406
407                 # Remove the stdout and stderr of the application
408                 try:
409                     os.remove('/tmp/'+self._topic_app +'.out')
410                     os.remove('/tmp/'+self._topic_app +'.err')
411                 except OSError:
412                     pass
413
414             OMFAPIFactory.release_api(self.get('version'), 
415               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
416                self.get('xmppPassword'), exp_id = self.exp_id)
417
418         super(OMFApplication, self).do_release()
419