use print() - import print_function - should be fine for both py2 and py3
[nepi.git] / src / nepi / resources / omf / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 #         Julien Tribino <julien.tribino@inria.fr>
19
20 from __future__ import print_function
21
22 import os
23
24 from nepi.util.timefuncs import tnow
25 from nepi.execution.resource import ResourceManager, clsinit_copy, \
26         ResourceState
27 from nepi.execution.trace import Trace, TraceAttr
28 from nepi.execution.attribute import Attribute, Flags 
29 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
30 from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
31 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
32
33 from nepi.util import sshfuncs
34
35 @clsinit_copy
36 class OMFApplication(OMFResource):
37     """
38     .. class:: Class Args :
39       
40         :param ec: The Experiment controller
41         :type ec: ExperimentController
42         :param guid: guid of the RM
43         :type guid: int
44
45     """
46     _rtype = "omf::Application"
47     _authorized_connections = ["omf::Node", "wilabt::sfa::Node"]
48
49     @classmethod
50     def _register_attributes(cls):
51         """ Register the attributes of an OMF application
52
53         """
54         command = Attribute("command", "Command to execute")
55         env = Attribute("env", "Environnement variable of the application")
56
57         # For OMF 5:
58         appid = Attribute("appid", "Name of the application")
59         stdin = Attribute("stdin", "Input of the application", default = "")
60         sources = Attribute("sources", "Sources of the application", 
61                      flags = Flags.Design)
62         sshuser = Attribute("sshUser", "user to connect with ssh", 
63                      flags = Flags.Design)
64         sshkey = Attribute("sshKey", "key to use for ssh", 
65                      flags = Flags.Design)
66
67         cls._register_attribute(appid)
68         cls._register_attribute(command)
69         cls._register_attribute(env)
70         cls._register_attribute(stdin)
71         cls._register_attribute(sources)
72         cls._register_attribute(sshuser)
73         cls._register_attribute(sshkey)
74
75     def __init__(self, ec, guid):
76         """
77         :param ec: The Experiment controller
78         :type ec: ExperimentController
79         :param guid: guid of the RM
80         :type guid: int
81         :param creds: Credentials to communicate with the rm (XmppClient for OMF)
82         :type creds: dict
83
84         """
85         super(OMFApplication, self).__init__(ec, guid)
86
87         self.set('command', "")
88         self.set('appid', "")
89         self._path= ""
90         self._args = ""
91         self.set('env', "")
92
93         self._node = None
94
95         self._omf_api = None
96         self._topic_app = None
97         self.create_id = None
98         self._create_cnt = 0
99         self._start_cnt = 0
100         self.release_id = None
101         self._release_cnt = 0
102
103         # For performance tests
104         self.begin_deploy_time = None
105         self.begin_start_time = None
106         self.begin_release_time = None
107         self.dperf = True
108         self.sperf = True
109         self.rperf = True
110
111         self.add_set_hook()
112
113     def _init_command(self):
114         comm = self.get('command').split(' ')
115         self._path= comm[0]
116         if len(comm)>1:
117             self._args = ' '.join(comm[1:])
118
119     @property
120     def exp_id(self):
121         return self.ec.exp_id
122
123     @property
124     def node(self):
125         rm_list = self.get_connected(OMFNode.get_rtype())
126         if rm_list: return rm_list[0]
127         return None
128
129     def stdin_hook(self, old_value, new_value):
130         """ Set a hook to the stdin attribute in order to send a message at each time
131         the value of this parameter is changed. Used ofr OMF 5.4 only
132
133         """
134         self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
135         return new_value
136
137     def add_set_hook(self):
138         """ Initialize the hooks for OMF 5.4 only
139
140         """
141         attr = self._attrs["stdin"]
142         attr.set_hook = self.stdin_hook
143
144     def valid_connection(self, guid):
145         """ Check if the connection with the guid in parameter is possible. 
146         Only meaningful connections are allowed.
147
148         :param guid: Guid of RM it will be connected
149         :type guid: int
150         :rtype:  Boolean
151
152         """
153         rm = self.ec.get_resource(guid)
154         if rm.get_rtype() not in self._authorized_connections:
155             msg = ("Connection between %s %s and %s %s refused: "
156                     "An Application can be connected only to a Node" ) % \
157                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
158             self.debug(msg)
159
160             return False
161
162         elif len(self.connections) != 0 :
163             msg = ("Connection between %s %s and %s %s refused: "
164                     "This Application is already connected" ) % \
165                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
166             self.debug(msg)
167
168             return False
169
170         else :
171             msg = "Connection between %s %s and %s %s accepted" % (
172                     self.get_rtype(), self._guid, rm.get_rtype(), guid)
173             self.debug(msg)
174
175             return True
176
177     def do_deploy(self):
178         """ Deploy the RM. It means nothing special for an application 
179         for now (later it will be upload sources, ...)
180         It becomes DEPLOYED after the topic for the application has been created
181
182         """
183         if not self.node or self.node.state < ResourceState.READY:
184             self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
185                        % self.node.state )
186             self.ec.schedule(self.reschedule_delay, self.deploy)
187             return
188
189         ## For performance test
190         if self.dperf:
191             self.begin_deploy_time = tnow()
192             self.dperf = False
193
194         self._init_command()
195
196         self.set('xmppUser',self.node.get('xmppUser'))
197         self.set('xmppServer',self.node.get('xmppServer'))
198         self.set('xmppPort',self.node.get('xmppPort'))
199         self.set('xmppPassword',self.node.get('xmppPassword'))
200         self.set('version',self.node.get('version'))
201
202         if not self.get('xmppServer'):
203             msg = "XmppServer is not initialzed. XMPP Connections impossible"
204             self.error(msg)
205             raise RuntimeError, msg
206
207         if not (self.get('xmppUser') or self.get('xmppPort') 
208                    or self.get('xmppPassword')):
209             msg = "Credentials are not all initialzed. Default values will be used"
210             self.warn(msg)
211
212         if not self.get('command') :
213             msg = "Application's Command is not initialized"
214             self.error(msg)
215             raise RuntimeError, msg
216
217         if not self._omf_api :
218             self._omf_api = OMFAPIFactory.get_api(self.get('version'), 
219               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
220                self.get('xmppPassword'), exp_id = self.exp_id)
221
222         if self.get('version') == "5":
223
224             self.begin_deploy_time = tnow()
225
226             if self.get('sources'):
227                 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
228                 user = self.get('sshUser') or self.get('xmppUser')
229                 dst = user + "@"+ gateway + ":"
230                 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
231         else :
232             # For OMF 6 :
233             if not self.create_id:
234                 props = {}
235                 if self.get('command'):
236                     props['application:binary_path'] = self.get('command')
237                     props['application:hrn'] = self.get('command')
238                     props['application:membership'] = self._topic_app
239                 props['application:type'] = "application"
240     
241                 self.create_id = os.urandom(16).encode('hex')
242                 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
243    
244             if self._create_cnt > confirmation_counter:
245                 msg = "Couldn't retrieve the confirmation of the creation"
246                 self.error(msg)
247                 raise RuntimeError, msg
248
249             uid = self.check_deploy(self.create_id)
250             if not uid:
251                 self._create_cnt +=1
252                 self.ec.schedule(reschedule_check, self.deploy)
253                 return
254
255             self._topic_app = uid
256             self._omf_api.enroll_topic(self._topic_app)
257
258         super(OMFApplication, self).do_deploy()
259
260     def check_deploy(self, cid):
261         """ Check, through the mail box in the parser, 
262         if the confirmation of the creation has been received
263
264         :param cid: the id of the original message
265         :type guid: string
266
267         """
268         uid = self._omf_api.check_mailbox("create", cid)
269         if uid : 
270             return uid
271         return False
272
273     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
274         self.info("Retrieving '%s' trace %s " % (name, attr))
275         if name == 'stdout' :
276             suffix = '.out'
277         elif name == 'stderr' :
278             suffix = '.err'
279         else :
280             suffix = '.misc'
281
282         trace_path = '/tmp/'+ self._topic_app + suffix
283
284         if attr == TraceAttr.PATH:
285             return trace_path
286
287         if attr == TraceAttr.ALL:
288             try:
289                 f = open(trace_path ,'r')
290             except IOError:
291                 print("File with traces has not been found")
292                 return False
293             out = f.read()
294             f.close()
295         return out
296
297
298     def do_start(self):
299         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
300          to execute the application. 
301
302         """
303         ## For performance test
304         if self.sperf:
305             self.begin_start_time = tnow()
306             self.sperf = False
307
308         if not self.get('env'):
309             self.set('env', " ")
310
311         if self.get('version') == "5":
312             self.begin_start_time = tnow()
313             # Some information to check the command for OMF5
314             msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
315                 self.get('appid') + " : " + self._path + " : " + \
316                 self._args + " : " + self.get('env')
317             self.debug(msg)
318
319             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
320                 self._args, self._path, self.get('env'))
321         else:
322             #For OMF 6
323             if self._start_cnt == 0:
324                 props = {}
325                 props['state'] = "running"
326     
327                 guards = {}
328                 guards['type'] = "application"
329                 guards['name'] = self.get('command')
330
331                 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
332
333             if self._start_cnt > confirmation_counter:
334                 msg = "Couldn't retrieve the confirmation that the application started"
335                 self.error(msg)
336                 raise RuntimeError, msg
337
338             res = self.check_start(self._topic_app)
339             if not res:
340                 self._start_cnt +=1
341                 self.ec.schedule(reschedule_check, self.start)
342                 return
343
344         super(OMFApplication, self).do_start()
345
346     def check_start(self, uid):
347         """ Check, through the mail box in the parser, 
348         if the confirmation of the start has been received
349
350         :param uid: the id of the original message
351         :type guid: string
352
353         """
354         res = self._omf_api.check_mailbox("started", uid)
355         if res : 
356             return True
357         return False
358
359     def do_stop(self):
360         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
361         kill the application. 
362         State is set to STOPPED after the message is sent.
363
364         """
365
366
367         if self.get('version') == 5:
368             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
369         super(OMFApplication, self).do_stop()
370
371     def check_release(self, cid):
372         """ Check, through the mail box in the parser, 
373         if the confirmation of the release has been received
374
375         :param cid: the id of the original message
376         :type guid: string
377
378         """
379         res = self._omf_api.check_mailbox("release", cid)
380         if res : 
381             return res
382         return False
383
384     def do_release(self):
385         """ Clean the RM at the end of the experiment and release the API.
386
387         """
388         ## For performance test
389         if self.rperf:
390             self.begin_release_time = tnow()
391             self.rperf = False
392
393         if self._omf_api:
394             if self.get('version') == "6" and self._topic_app:
395                 if not self.release_id:
396                     self.release_id = os.urandom(16).encode('hex')
397                     self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
398     
399                 if self._release_cnt < confirmation_counter:
400                     cid = self.check_release(self.release_id)
401                     if not cid:
402                         self._release_cnt +=1
403                         self.ec.schedule(reschedule_check, self.release)
404                         return
405                 else:
406                     msg = "Couldn't retrieve the confirmation of the release"
407                     self.error(msg)
408
409                 # Remove the stdout and stderr of the application
410                 try:
411                     os.remove('/tmp/'+self._topic_app +'.out')
412                     os.remove('/tmp/'+self._topic_app +'.err')
413                 except OSError:
414                     pass
415
416             OMFAPIFactory.release_api(self.get('version'), 
417               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
418                self.get('xmppPassword'), exp_id = self.exp_id)
419
420         super(OMFApplication, self).do_release()
421