First version of OMF6 working. Just problem of wifi driver are still there
[nepi.git] / src / nepi / resources / omf / application.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Julien Tribino <julien.tribino@inria.fr>
20
21 import os
22
23 from nepi.execution.resource import ResourceManager, clsinit_copy, \
24         ResourceState, reschedule_delay
25 from nepi.execution.attribute import Attribute, Flags 
26 from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
27 from nepi.resources.omf.node import OMFNode
28 from nepi.resources.omf.omf_api_factory import OMFAPIFactory
29
30 from nepi.util import sshfuncs
31
32 @clsinit_copy
33 class OMFApplication(OMFResource):
34     """
35     .. class:: Class Args :
36       
37         :param ec: The Experiment controller
38         :type ec: ExperimentController
39         :param guid: guid of the RM
40         :type guid: int
41
42     """
43     _rtype = "OMFApplication"
44     _authorized_connections = ["OMFNode"]
45
46     @classmethod
47     def _register_attributes(cls):
48         """ Register the attributes of an OMF application
49
50         """
51         command = Attribute("command", "Command to execute")
52         env = Attribute("env", "Environnement variable of the application")
53
54         # For OMF 5:
55         appid = Attribute("appid", "Name of the application")
56         stdin = Attribute("stdin", "Input of the application", default = "")
57         sources = Attribute("sources", "Sources of the application", 
58                      flags = Flags.ExecReadOnly)
59         sshuser = Attribute("sshUser", "user to connect with ssh", 
60                      flags = Flags.ExecReadOnly)
61         sshkey = Attribute("sshKey", "key to use for ssh", 
62                      flags = Flags.ExecReadOnly)
63
64         cls._register_attribute(appid)
65         cls._register_attribute(command)
66         cls._register_attribute(env)
67         cls._register_attribute(stdin)
68         cls._register_attribute(sources)
69         cls._register_attribute(sshuser)
70         cls._register_attribute(sshkey)
71
72     def __init__(self, ec, guid):
73         """
74         :param ec: The Experiment controller
75         :type ec: ExperimentController
76         :param guid: guid of the RM
77         :type guid: int
78         :param creds: Credentials to communicate with the rm (XmppClient for OMF)
79         :type creds: dict
80
81         """
82         super(OMFApplication, self).__init__(ec, guid)
83
84         self.set('command', "")
85         self.set('appid', "")
86         self.path= ""
87         self.args = ""
88         self.set('env', "")
89
90         self._node = None
91
92         self._omf_api = None
93         self._topic_app = None
94         self.create_id = None
95         self.release_id = None
96
97         self.add_set_hook()
98
99     def _init_command(self):
100         comm = self.get('command').split(' ')
101         self.path= comm[0]
102         if len(comm)>1:
103             self.args = ' '.join(comm[1:])
104
105     @property
106     def exp_id(self):
107         return self.ec.exp_id
108
109     @property
110     def node(self):
111         rm_list = self.get_connected(OMFNode.get_rtype())
112         if rm_list: return rm_list[0]
113         return None
114
115     def stdin_hook(self, old_value, new_value):
116         """ Set a hook to the stdin attribute in order to send a message at each time
117         the value of this parameter is changed
118
119         """
120         self._omf_api.send_stdin(self.node.get('hostname'), new_value, self.get('appid'))
121         return new_value
122
123     def add_set_hook(self):
124         """ Initialize the hooks
125
126         """
127         attr = self._attrs["stdin"]
128         attr.set_hook = self.stdin_hook
129
130     def valid_connection(self, guid):
131         """ Check if the connection with the guid in parameter is possible. 
132         Only meaningful connections are allowed.
133
134         :param guid: Guid of RM it will be connected
135         :type guid: int
136         :rtype:  Boolean
137
138         """
139         rm = self.ec.get_resource(guid)
140         if rm.get_rtype() not in self._authorized_connections:
141             msg = ("Connection between %s %s and %s %s refused: "
142                     "An Application can be connected only to a Node" ) % \
143                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
144             self.debug(msg)
145
146             return False
147
148         elif len(self.connections) != 0 :
149             msg = ("Connection between %s %s and %s %s refused: "
150                     "This Application is already connected" ) % \
151                 (self.get_rtype(), self._guid, rm.get_rtype(), guid)
152             self.debug(msg)
153
154             return False
155
156         else :
157             msg = "Connection between %s %s and %s %s accepted" % (
158                     self.get_rtype(), self._guid, rm.get_rtype(), guid)
159             self.debug(msg)
160
161             return True
162
163     def do_deploy(self):
164         """ Deploy the RM. It means nothing special for an application 
165         for now (later it will be upload sources, ...)
166         It becomes DEPLOYED after getting the xmpp client.
167
168         """
169         if not self.node or self.node.state < ResourceState.READY:
170             self.debug("---- RESCHEDULING DEPLOY ---- node state %s "
171                        % self.node.state )
172             self.ec.schedule(reschedule_delay, self.deploy)
173             return
174
175         self._init_command()
176
177         self.set('xmppUser',self.node.get('xmppUser'))
178         self.set('xmppServer',self.node.get('xmppServer'))
179         self.set('xmppPort',self.node.get('xmppPort'))
180         self.set('xmppPassword',self.node.get('xmppPassword'))
181         self.set('version',self.node.get('version'))
182
183         if not self.get('xmppServer'):
184             msg = "XmppServer is not initialzed. XMPP Connections impossible"
185             self.error(msg)
186             raise RuntimeError, msg
187
188         if not (self.get('xmppUser') or self.get('xmppPort') 
189                    or self.get('xmppPassword')):
190             msg = "Credentials are not all initialzed. Default values will be used"
191             self.warn(msg)
192
193         if not self._omf_api :
194             self._omf_api = OMFAPIFactory.get_api(self.get('version'), 
195               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
196                self.get('xmppPassword'), exp_id = self.exp_id)
197
198         if self.get('version') == "5":
199             if self.get('sources'):
200                 gateway = ResourceGateway.AMtoGateway[self.get('xmppServer')]
201                 user = self.get('sshUser') or self.get('xmppUser')
202                 dst = user + "@"+ gateway + ":"
203                 (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
204         else :
205             # For OMF 6 :
206             if not self.create_id:
207                 props = {}
208                 if self.get('command'):
209                     props['application:binary_path'] = self.get('command')
210                     props['application:hrn'] = self.get('command')
211                     props['application:membership'] = self._topic_app
212                 props['application:type'] = "application"
213     
214                 self.create_id = os.urandom(16).encode('hex')
215                 self._omf_api.frcp_create( self.create_id, self.node.get('hostname'), "application", props = props)
216     
217             uid = self.check_deploy(self.create_id)
218             if not uid:
219                 self.ec.schedule(reschedule_delay, self.deploy)
220                 return
221         
222             self._topic_app = uid
223             self._omf_api.enroll_topic(self._topic_app)
224
225         super(OMFApplication, self).do_deploy()
226
227     def check_deploy(self, cid):
228         uid = self._omf_api.check_mailbox("create", cid)
229         if uid : 
230             return uid
231         return False
232
233     def do_start(self):
234         """ Start the RM. It means : Send Xmpp Message Using OMF protocol 
235          to execute the application. 
236          It becomes STARTED before the messages are sent (for coordination)
237
238         """
239         if not self.get('command') :
240             msg = "Application's Command is not initialized"
241             self.error(msg)
242             raise RuntimeError, msg
243
244         if not self.get('env'):
245             self.set('env', " ")
246
247         if self.get('version') == "5":
248             # Some information to check the command for OMF5
249             msg = " " + self.get_rtype() + " ( Guid : " + str(self._guid) +") : " + \
250                 self.get('appid') + " : " + self.path + " : " + \
251                 self.args + " : " + self.get('env')
252             self.debug(msg)
253
254             self._omf_api.execute(self.node.get('hostname'),self.get('appid'), \
255                 self.get('args'), self.get('path'), self.get('env'))
256         else:
257             #For OMF 6
258             props = {}
259             props['state'] = "running"
260     
261             guards = {}
262             guards['type'] = "application"
263             guards['name'] = self.get('command')
264
265             self._omf_api.frcp_configure(self._topic_app, props = props, guards = guards )
266
267
268         super(OMFApplication, self).do_start()
269
270     def do_stop(self):
271         """ Stop the RM. It means : Send Xmpp Message Using OMF protocol to 
272         kill the application. 
273         State is set to STOPPED after the message is sent.
274
275         """
276         if self.get('version') == 5:
277             self._omf_api.exit(self.node.get('hostname'),self.get('appid'))
278         super(OMFApplication, self).do_stop()
279
280     def check_release(self, cid):
281         res = self._omf_api.check_mailbox("release", cid)
282         if res : 
283             return res
284         return False
285
286     def do_release(self):
287         """ Clean the RM at the end of the experiment and release the API.
288
289         """
290         if self.get('version') == "6":
291             if not self.release_id:
292                 self.release_id = os.urandom(16).encode('hex')
293                 self._omf_api.frcp_release( self.release_id, self.node.get('hostname'),self._topic_app, res_id=self._topic_app)
294     
295             cid = self.check_release(self.release_id)
296             if not cid:
297                 self.ec.schedule(reschedule_delay, self.release)
298                 return
299
300         if self._omf_api:
301             OMFAPIFactory.release_api(self.get('version'), 
302               self.get('xmppServer'), self.get('xmppUser'), self.get('xmppPort'),
303                self.get('xmppPassword'), exp_id = self.exp_id)
304
305         super(OMFApplication, self).do_release()
306