update the benchmark for OMF 6
[nepi.git] / src / nepi / resources / omf / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Julien Tribino <julien.tribino@inria.fr>
20
21 import os
22
23 from nepi.util.timefuncs import tnow
24 from nepi.execution.resource import ResourceManager, clsinit_copy, \
25         ResourceState, reschedule_delay
26 from nepi.execution.attribute import Attribute, Flags 
27 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
28 from nepi.resources.omf.node import OMFNode, confirmation_counter, reschedule_check
29 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
30
31 from nepi.util import sshfuncs
32
33 @clsinit_copy
34 class OMFApplication(OMFResource):
35     """
36     .. class:: Class Args :
37       
38         :param ec: The Experiment controller
39         :type ec: ExperimentController
40         :param guid: guid of the RM
41         :type guid: int
42
43     """
44     _rtype = "OMFApplication"
45     _authorized_connections = ["OMFNode", "WilabtSfaNode"]
46
47     @classmethod
48     def _register_attributes(cls):
49         """ Register the attributes of an OMF application
50
51         """
52         command = Attribute("command", "Command to execute")
53         env = Attribute("env", "Environnement variable of the application")
54
55         # For OMF 5:
56         appid = Attribute("appid", "Name of the application")
57         stdin = Attribute("stdin", "Input of the application", default = "")
58         sources = Attribute("sources", "Sources of the application", 
59                      flags = Flags.Design)
60         sshuser = Attribute("sshUser", "user to connect with ssh", 
61                      flags = Flags.Design)
62         sshkey = Attribute("sshKey", "key to use for ssh", 
63                      flags = Flags.Design)
64
65         cls._register_attribute(appid)
66         cls._register_attribute(command)
67         cls._register_attribute(env)
68         cls._register_attribute(stdin)
69         cls._register_attribute(sources)
70         cls._register_attribute(sshuser)
71         cls._register_attribute(sshkey)
72
73     def __init__(self, ec, guid):
74         """
75         :param ec: The Experiment controller
76         :type ec: ExperimentController
77         :param guid: guid of the RM
78         :type guid: int
79         :param creds: Credentials to communicate with the rm (XmppClient for OMF)
80         :type creds: dict
81
82         """
83         super(OMFApplication, self).__init__(ec, guid)
84
85         self.set('command', "")
86         self.set('appid', "")
87         self._path= ""
88         self._args = ""
89         self.set('env', "")
90
91         self._node = None
92
93         self._omf_api = None
94         self._topic_app = None
95         self.create_id = None
96         self._create_cnt = 0
97         self._start_cnt = 0
98         self.release_id = None
99         self._release_cnt = 0
100
101         # For performance tests
102         self.begin_deploy_time = None
103         self.begin_start_time = None
104         self.begin_release_time = None
105         self.dperf = True
106         self.sperf = True
107         self.rperf = True
108
109         self.add_set_hook()
110
111     def _init_command(self):
112         comm = self.get('command').split(' ')
113         self._path= comm[0]
114         if len(comm)>1:
115             self._args = ' '.join(comm[1:])
116
117     @property
118     def exp_id(self):
119         return self.ec.exp_id
120
121     @property
122     def node(self):
123         rm_list = self.get_connected(OMFNode.get_rtype())
124         if rm_list: return rm_list[0]
125         return None
126
127     def stdin_hook(self, old_value, new_value):
128         """ Set a hook to the stdin attribute in order to send a message at each time
129         the value of this parameter is changed. Used ofr OMF 5.4 only
130
131         """
132         self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
133         return new_value
134
135     def add_set_hook(self):
136         """ Initialize the hooks for OMF 5.4 only
137
138         """
139         attr = self._attrs["stdin"]
140         attr.set_hook = self.stdin_hook
141
142     def valid_connection(self, guid):
143         """ Check if the connection with the guid in parameter is possible. 
144         Only meaningful connections are allowed.
145
146         :param guid: Guid of RM it will be connected
147         :type guid: int
148         :rtype:  Boolean
149
150         """
151         rm = self.ec.get_resource(guid)
152         if rm.get_rtype() not in self._authorized_connections:
153             msg = ("Connection between %s %s and %s %s refused: "
154                     "An Application can be connected only to a Node" ) % \
155                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
156             self.debug(msg)
157
158             return False
159
160         elif len(self.connections) != 0 :
161             msg = ("Connection between %s %s and %s %s refused: "
162                     "This Application is already connected" ) % \
163                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
164             self.debug(msg)
165
166             return False
167
168         else :
169             msg = "Connection between %s %s and %s %s accepted" % (
170                     self.get_rtype(), self._guid, rm.get_rtype(), guid)
171             self.debug(msg)
172
173             return True
174
175     def do_deploy(self):
176         """ Deploy the RM. It means nothing special for an application 
177         for now (later it will be upload sources, ...)
178         It becomes DEPLOYED after the topic for the application has been created
179
180         """
181         if not self.node or self.node.state < ResourceState.READY:
182             self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
183                        % self.node.state )
184             self.ec.schedule(reschedule_delay, self.deploy)
185             return
186
187         ## For performance test
188         if self.dperf:
189             self.begin_deploy_time = tnow()
190             self.dperf = False
191
192         self._init_command()
193
194         self.set('xmppUser',self.node.get('xmppUser'))
195         self.set('xmppServer',self.node.get('xmppServer'))
196         self.set('xmppPort',self.node.get('xmppPort'))
197         self.set('xmppPassword',self.node.get('xmppPassword'))
198         self.set('version',self.node.get('version'))
199
200         if not self.get('xmppServer'):
201             msg = "XmppServer is not initialzed. XMPP Connections impossible"
202             self.error(msg)
203             raise RuntimeError, msg
204
205         if not (self.get('xmppUser') or self.get('xmppPort') 
206                    or self.get('xmppPassword')):
207             msg = "Credentials are not all initialzed. Default values will be used"
208             self.warn(msg)
209
210         if not self.get('command') :
211             msg = "Application's Command is not initialized"
212             self.error(msg)
213             raise RuntimeError, msg
214
215         if not self._omf_api :
216             self._omf_api = OMFAPIFactory.get_api(self.get('version'), 
217               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
218                self.get('xmppPassword'), exp_id = self.exp_id)
219
220         if self.get('version') == "5":
221
222             self.begin_deploy_time = tnow()
223
224             if self.get('sources'):
225                 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
226                 user = self.get('sshUser') or self.get('xmppUser')
227                 dst = user + "@"+ gateway + ":"
228                 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
229         else :
230             # For OMF 6 :
231             if not self.create_id:
232                 props = {}
233                 if self.get('command'):
234                     props['application:binary_path'] = self.get('command')
235                     props['application:hrn'] = self.get('command')
236                     props['application:membership'] = self._topic_app
237                 props['application:type'] = "application"
238     
239                 self.create_id = os.urandom(16).encode('hex')
240                 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
241    
242             if self._create_cnt > confirmation_counter:
243                 msg = "Couldn't retrieve the confirmation of the creation"
244                 self.error(msg)
245                 raise RuntimeError, msg
246
247             uid = self.check_deploy(self.create_id)
248             if not uid:
249                 self._create_cnt +=1
250                 self.ec.schedule(reschedule_check, self.deploy)
251                 return
252
253             self._topic_app = uid
254             self._omf_api.enroll_topic(self._topic_app)
255
256         super(OMFApplication, self).do_deploy()
257
258     def check_deploy(self, cid):
259         """ Check, through the mail box in the parser, 
260         if the confirmation of the creation has been received
261
262         :param cid: the id of the original message
263         :type guid: string
264
265         """
266         uid = self._omf_api.check_mailbox("create", cid)
267         if uid : 
268             return uid
269         return False
270
271     def do_start(self):
272         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
273          to execute the application. 
274
275         """
276         ## For performance test
277         if self.sperf:
278             self.begin_start_time = tnow()
279             self.sperf = False
280
281         if not self.get('env'):
282             self.set('env', " ")
283
284         if self.get('version') == "5":
285             self.begin_start_time = tnow()
286             # Some information to check the command for OMF5
287             msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
288                 self.get('appid') + " : " + self._path + " : " + \
289                 self._args + " : " + self.get('env')
290             self.debug(msg)
291
292             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
293                 self._args, self._path, self.get('env'))
294         else:
295             #For OMF 6
296             if self._start_cnt == 0:
297                 props = {}
298                 props['state'] = "running"
299     
300                 guards = {}
301                 guards['type'] = "application"
302                 guards['name'] = self.get('command')
303
304                 self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
305
306             if self._start_cnt > confirmation_counter:
307                 msg = "Couldn't retrieve the confirmation that the application started"
308                 self.error(msg)
309                 raise RuntimeError, msg
310
311             res = self.check_start(self._topic_app)
312             if not res:
313                 self._start_cnt +=1
314                 self.ec.schedule(reschedule_check, self.start)
315                 return
316
317         super(OMFApplication, self).do_start()
318
319     def check_start(self, uid):
320         """ Check, through the mail box in the parser, 
321         if the confirmation of the start has been received
322
323         :param uid: the id of the original message
324         :type guid: string
325
326         """
327         res = self._omf_api.check_mailbox("started", uid)
328         if res : 
329             return True
330         return False
331
332     def do_stop(self):
333         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
334         kill the application. 
335         State is set to STOPPED after the message is sent.
336
337         """
338         if self.get('version') == 5:
339             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
340         super(OMFApplication, self).do_stop()
341
342     def check_release(self, cid):
343         """ Check, through the mail box in the parser, 
344         if the confirmation of the release has been received
345
346         :param cid: the id of the original message
347         :type guid: string
348
349         """
350         res = self._omf_api.check_mailbox("release", cid)
351         if res : 
352             return res
353         return False
354
355     def do_release(self):
356         """ Clean the RM at the end of the experiment and release the API.
357
358         """
359         ## For performance test
360         if self.rperf:
361             self.begin_release_time = tnow()
362             self.rperf = False
363
364         if self._omf_api:
365             if self.get('version') == "6" and self._topic_app:
366                 if not self.release_id:
367                     self.release_id = os.urandom(16).encode('hex')
368                     self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
369     
370                 if self._release_cnt < confirmation_counter:
371                     cid = self.check_release(self.release_id)
372                     if not cid:
373                         self._release_cnt +=1
374                         self.ec.schedule(reschedule_check, self.release)
375                         return
376                 else:
377                     msg = "Couldn't retrieve the confirmation of the release"
378                     self.error(msg)
379
380
381             OMFAPIFactory.release_api(self.get('version'), 
382               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
383                self.get('xmppPassword'), exp_id = self.exp_id)
384
385         super(OMFApplication, self).do_release()
386