5def17b91f1868c3c4ffea4c077186c9b1f2cb92
[nepi.git] / src / nepi / resources / omf / omf_api.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Julien Tribino <julien.tribino@inria.fr>
20
21 import ssl
22 import sys
23 import time
24 import hashlib
25 import threading
26
27 from nepi.util.logger import Logger
28
29 from nepi.resources.omf.omf_client import OMFClient
30 from nepi.resources.omf.messages_5_4 import MessageHandler
31
32 class OMFAPI(Logger):
33     """
34     .. class:: Class Args :
35       
36         :param slice: Xmpp Slice
37         :type slice: str
38         :param host: Xmpp Server
39         :type host: str
40         :param port: Xmpp Port
41         :type port: str
42         :param password: Xmpp password
43         :type password: str
44         :param xmpp_root: Root of the Xmpp Topic Architecture
45         :type xmpp_root: str
46
47     .. note::
48
49        This class is the implementation of an OMF 5.4 API. 
50        Since the version 5.4.1, the Topic Architecture start with OMF_5.4 
51        instead of OMF used for OMF5.3
52
53     """
54     def __init__(self, slice, host, port, password, xmpp_root = None, 
55             exp_id = None):
56         """
57     
58         :param slice: Xmpp Slice
59         :type slice: str
60         :param host: Xmpp Server
61         :type host: str
62         :param port: Xmpp Port
63         :type port: str
64         :param password: Xmpp password
65         :type password: str
66         :param xmpp_root: Root of the Xmpp Topic Architecture
67         :type xmpp_root: str
68
69         """
70         super(OMFAPI, self).__init__("OMFAPI")
71         self._exp_id = exp_id 
72         self._user = "%s-%s" % (slice, self._exp_id)
73         self._slice = slice
74         self._host = host
75         self._port = port
76         self._password = password
77         self._hostnames = []
78         self._xmpp_root = xmpp_root or "OMF_5.4"
79
80         # OMF xmpp client
81         self._client = None
82
83         # message handler
84         self._message = None
85
86         if sys.version_info < (3, 0):
87             reload(sys)
88             sys.setdefaultencoding('utf8')
89
90         # instantiate the xmpp client
91         self._init_client()
92
93         # register xmpp nodes for the experiment
94         self._enroll_experiment()
95         self._enroll_newexperiment()
96
97         # register xmpp logger for the experiment
98         self._enroll_logger()
99
100     def _init_client(self):
101         """ Initialize XMPP Client
102
103         """
104         jid = "%s@%s" % (self._user, self._host)
105         xmpp = OMFClient(jid, self._password)
106         # PROTOCOL_SSLv3 required for compatibility with OpenFire
107         xmpp.ssl_version = ssl.PROTOCOL_SSLv3
108
109         if xmpp.connect((self._host, self._port)):
110             xmpp.process(block=False)
111             while not xmpp.ready:
112                 time.sleep(1)
113             self._client = xmpp
114             self._message = MessageHandler(self._slice, self._user)
115         else:
116             msg = "Unable to connect to the XMPP server."
117             self.error(msg)
118             raise RuntimeError(msg)
119
120     def _enroll_experiment(self):
121         """ Create and Subscribe to the Session Topic
122
123         """
124         xmpp_node = self._exp_session_id
125         self._client.create(xmpp_node)
126         #print "Create experiment sesion id topics !!" 
127         self._client.subscribe(xmpp_node)
128         #print "Subscribe to experiment sesion id topics !!" 
129
130
131     def _enroll_newexperiment(self):
132         """ Publish New Experiment Message
133
134         """
135         address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice,
136                 self._user)
137         #print address
138         payload = self._message.newexp_function(self._user, address)
139         slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
140         self._client.publish(payload, slice_sid)
141
142     def _enroll_logger(self):
143         """ Create and Subscribe to the Logger Topic
144
145         """
146         xmpp_node = self._logger_session_id
147         self._client.create(xmpp_node)
148         self._client.subscribe(xmpp_node)
149
150         payload = self._message.log_function("2", 
151                 "nodeHandler::NodeHandler", 
152                 "INFO", 
153                 "OMF Experiment Controller 5.4 (git 529a626)")
154         self._client.publish(payload, xmpp_node)
155
156     def _host_session_id(self, hostname):
157         """ Return the Topic Name as /xmpp_root/slice/user/hostname
158
159         :param hostname: Full hrn of the node
160         :type hostname: str
161
162         """
163         return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, 
164                 hostname)
165
166     def _host_resource_id(self, hostname):
167         """ Return the Topic Name as /xmpp_root/slice/resources/hostname
168
169         :param hostname: Full hrn of the node
170         :type hostname: str
171
172         """
173         return "/%s/%s/resources/%s" % (self._xmpp_root, self._slice, hostname)
174
175     @property
176     def _exp_session_id(self):
177         """ Return the Topic Name as /xmpp_root/slice/user
178
179         """
180         return "/%s/%s/%s" % (self._xmpp_root, self._slice, self._user)
181
182     @property
183     def _logger_session_id(self):
184         """ Return the Topic Name as /xmpp_root/slice/LOGGER
185
186         """
187         return "/%s/%s/%s/LOGGER" % (self._xmpp_root, self._slice, self._user)
188
189     def delete(self, hostname):
190         """ Delete the topic corresponding to the hostname for this session
191
192         :param hostname: Full hrn of the node
193         :type hostname: str
194
195         """
196         if not hostname in self._hostnames:
197             return
198
199         self._hostnames.remove(hostname)
200
201         xmpp_node = self._host_session_id(hostname)
202         self._client.delete(xmpp_node)
203
204     def enroll_host(self, hostname):
205         """ Create and Subscribe to the session topic and the resources
206             corresponding to the hostname
207
208         :param hostname: Full hrn of the node
209         :type hostname: str
210
211         """
212         if hostname in self._hostnames:
213             return 
214
215         self._hostnames.append(hostname)
216
217         xmpp_node =  self._host_session_id(hostname)
218         self._client.create(xmpp_node)
219         self._client.subscribe(xmpp_node)
220
221         xmpp_node =  self._host_resource_id(hostname)
222         self._client.subscribe(xmpp_node)
223
224         payload = self._message.enroll_function("1", "*", "1", hostname)
225         self._client.publish(payload, xmpp_node)
226
227     def configure(self, hostname, attribute, value):
228         """ Configure attribute on the node
229
230         :param hostname: Full hrn of the node
231         :type hostname: str
232         :param attribute: Attribute that need to be configured (
233             often written as /net/wX/attribute, with X the interface number)
234         :type attribute: str
235         :param value: Value of the attribute
236         :type value: str
237
238         """
239         payload = self._message.configure_function(hostname, value, attribute)
240         xmpp_node =  self._host_session_id(hostname)
241         self._client.publish(payload, xmpp_node)
242
243     
244     def send_stdin(self, hostname, value, app_id):
245         """ Send to the stdin of the application the value
246
247         :param hostname: Full hrn of the node
248         :type hostname: str
249         :param appid: Application Id (Any id that represents in a unique 
250             way the application)
251         :type appid: str
252         :param value: parameter to execute in the stdin of the application
253         :type value: str
254
255         """
256         payload = self._message.stdin_function(hostname, value, app_id)
257         xmpp_node =  self._host_session_id(hostname)
258         self._client.publish(payload, xmpp_node)
259
260
261     def execute(self, hostname, app_id, arguments, path, env):
262         """ Execute command on the node
263
264         :param hostname: Full hrn of the node
265         :type hostname: str
266         :param app_id: Application Id (Any id that represents in a unique 
267             way the application)
268         :type app_id: str
269         :param arguments: Arguments of the application
270         :type arguments: str
271         :param path: Path of the application
272         :type path: str
273         :param env: Environnement values for the application
274         :type env: str
275
276         """
277         payload = self._message.execute_function(hostname, app_id, arguments, 
278                 path, env)
279         xmpp_node =  self._host_session_id(hostname)
280         self._client.publish(payload, xmpp_node)
281
282     def exit(self, hostname, app_id):
283         """ Kill an application started with OMF
284
285         :param hostname: Full hrn of the node
286         :type hostname: str
287         :param app_id: Application Id of the application you want to stop
288         :type app_id: str
289
290         """
291         payload = self._message.exit_function(hostname, app_id)
292         xmpp_node =  self._host_session_id(hostname)
293         self._client.publish(payload, xmpp_node)
294
295     def release(self, hostname):
296         """ Delete the session and logger topics. Then disconnect 
297
298         """
299         if hostname in self._hostnames:
300             self.delete(hostname)
301
302     def disconnect(self) :
303         """ Delete the session and logger topics. Then disconnect 
304
305         """
306         self._client.delete(self._exp_session_id)
307         self._client.delete(self._logger_session_id)
308
309         time.sleep(1)
310         
311         # Wait the send queue to be empty before disconnect
312         self._client.disconnect(wait=True)
313         msg = " Disconnected from XMPP Server"
314         self.debug(msg)
315
316
317 class OMFAPIFactory(object):
318     """ 
319     .. note::
320
321         It allows the different RM to use the same xmpp client if they use 
322         the same credentials.  For the moment, it is focused on XMPP.
323
324     """
325     # use lock to avoid concurrent access to the Api list at the same times by 2 
326     # different threads
327     lock = threading.Lock()
328     _apis = dict()
329
330     @classmethod 
331     def get_api(cls, slice, host, port, password, exp_id = None):
332         """ Get an OMF Api
333
334         :param slice: Xmpp Slice Name
335         :type slice: str
336         :param host: Xmpp Server Adress
337         :type host: str
338         :param port: Xmpp Port (Default : 5222)
339         :type port: str
340         :param password: Xmpp Password
341         :type password: str
342
343         """
344         if slice and host and port and password:
345             key = cls._make_key(slice, host, port, password, exp_id)
346             cls.lock.acquire()
347             if key in cls._apis:
348                 #print "Api Counter : " + str(cls._apis[key]['cnt'])
349                 cls._apis[key]['cnt'] += 1
350                 cls.lock.release()
351                 return cls._apis[key]['api']
352             else :
353                 omf_api = cls.create_api(slice, host, port, password, exp_id)
354                 cls.lock.release()
355                 return omf_api
356         return None
357
358     @classmethod 
359     def create_api(cls, slice, host, port, password, exp_id):
360         """ Create an OMF API if this one doesn't exist yet with this credentials
361
362         :param slice: Xmpp Slice Name
363         :type slice: str
364         :param host: Xmpp Server Adress
365         :type host: str
366         :param port: Xmpp Port (Default : 5222)
367         :type port: str
368         :param password: Xmpp Password
369         :type password: str
370
371         """
372         omf_api = OMFAPI(slice, host, port, password, exp_id = exp_id)
373         key = cls._make_key(slice, host, port, password, exp_id)
374         cls._apis[key] = {}
375         cls._apis[key]['api'] = omf_api
376         cls._apis[key]['cnt'] = 1
377         return omf_api
378
379     @classmethod 
380     def release_api(cls, slice, host, port, password, exp_id = None):
381         """ Release an OMF API with this credentials
382
383         :param slice: Xmpp Slice Name
384         :type slice: str
385         :param host: Xmpp Server Adress
386         :type host: str
387         :param port: Xmpp Port (Default : 5222)
388         :type port: str
389         :param password: Xmpp Password
390         :type password: str
391
392         """
393         if slice and host and port and password:
394             key = cls._make_key(slice, host, port, password, exp_id)
395             if key in cls._apis:
396                 cls._apis[key]['cnt'] -= 1
397                 #print "Api Counter : " + str(cls._apis[key]['cnt'])
398                 if cls._apis[key]['cnt'] == 0:
399                     omf_api = cls._apis[key]['api']
400                     omf_api.disconnect()
401
402
403     @classmethod 
404     def _make_key(cls, *args):
405         """ Hash the credentials in order to create a key
406
407         :param args: list of arguments used to create the hash (user, host, port, ...)
408         :type args: list of args
409
410         """
411         skey = "".join(map(str, args))
412         return hashlib.md5(skey).hexdigest()
413
414
415