7d96cfb071ecfb9a78ea1ef4db741f58b3268e50
[nepi.git] / src / nepi / resources / omf / omf_api.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Julien Tribino <julien.tribino@inria.fr>
20
21 import ssl
22 import sys
23 import time
24 import hashlib
25 import threading
26
27 from nepi.util.logger import Logger
28
29 from nepi.resources.omf.omf_client import OMFClient
30 from nepi.resources.omf.messages_5_4 import MessageHandler
31
32 from nepi.util.timefuncs import tsformat 
33
34 class OMFAPI(Logger):
35     """
36     .. class:: Class Args :
37       
38         :param slice: Xmpp Slice
39         :type slice: str
40         :param host: Xmpp Server
41         :type host: str
42         :param port: Xmpp Port
43         :type port: str
44         :param password: Xmpp password
45         :type password: str
46         :param xmpp_root: Root of the Xmpp Topic Architecture
47         :type xmpp_root: str
48
49     .. note::
50
51        This class is the implementation of an OMF 5.4 API. Since the version 5.4.1, the Topic Architecture start with OMF_5.4 instead of OMF used for OMF5.3
52
53     """
54     def __init__(self, slice, host, port, password, xmpp_root = None):
55         """
56     
57         :param slice: Xmpp Slice
58         :type slice: str
59         :param host: Xmpp Server
60         :type host: str
61         :param port: Xmpp Port
62         :type port: str
63         :param password: Xmpp password
64         :type password: str
65         :param xmpp_root: Root of the Xmpp Topic Architecture
66         :type xmpp_root: str
67
68         """
69         super(OMFAPI, self).__init__("OMFAPI")
70         
71         date = tsformat()
72         tz = -time.altzone if time.daylight != 0 else -time.timezone
73         date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
74         self._user = "%s-%s" % (slice, date)
75         self._slice = slice
76         self._host = host
77         self._port = port
78         self._password = password
79         self._hostnames = []
80         self._xmpp_root = xmpp_root or "OMF_5.4"
81
82         # OMF xmpp client
83         self._client = None
84         # message handler
85         self._message = None
86
87         if sys.version_info < (3, 0):
88             reload(sys)
89             sys.setdefaultencoding('utf8')
90
91         # instantiate the xmpp client
92         self._init_client()
93
94         # register xmpp nodes for the experiment
95         self._enroll_experiment()
96         self._enroll_newexperiment()
97
98         # register xmpp logger for the experiment
99         self._enroll_logger()
100
101     def _init_client(self):
102         """ Initialize XMPP Client
103
104         """
105         jid = "%s@%s" % (self._user, self._host)
106         xmpp = OMFClient(jid, self._password)
107         # PROTOCOL_SSLv3 required for compatibility with OpenFire
108         xmpp.ssl_version = ssl.PROTOCOL_SSLv3
109
110         if xmpp.connect((self._host, self._port)):
111             xmpp.process(block=False)
112             while not xmpp.ready:
113                 time.sleep(1)
114             self._client = xmpp
115             self._message = MessageHandler(self._slice, self._user)
116         else:
117             msg = "Unable to connect to the XMPP server."
118             self.error(msg)
119             raise RuntimeError(msg)
120
121     def _enroll_experiment(self):
122         """ Create and Subscribe to the Session Topic
123
124         """
125         xmpp_node = self._exp_session_id
126         self._client.create(xmpp_node)
127         #print "Create experiment sesion id topics !!" 
128         self._client.subscribe(xmpp_node)
129         #print "Subscribe to experiment sesion id topics !!" 
130
131
132     def _enroll_newexperiment(self):
133         """ Publish New Experiment Message
134
135         """
136         address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user)
137         #print address
138         payload = self._message.newexp_function(self._user, address)
139         slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
140         self._client.publish(payload, slice_sid)
141
142     def _enroll_logger(self):
143         """ Create and Subscribe to the Logger Topic
144
145         """
146         xmpp_node = self._logger_session_id
147         self._client.create(xmpp_node)
148         self._client.subscribe(xmpp_node)
149
150         payload = self._message.log_function("2", 
151                 "nodeHandler::NodeHandler", 
152                 "INFO", 
153                 "OMF Experiment Controller 5.4 (git 529a626)")
154         self._client.publish(payload, xmpp_node)
155
156     def _host_session_id(self, hostname):
157         """ Return the Topic Name as /xmpp_root/slice/user/hostname
158
159         :param hostname: Full hrn of the node
160         :type hostname: str
161
162         """
163         return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, hostname)
164
165     def _host_resource_id(self, hostname):
166         """ Return the Topic Name as /xmpp_root/slice/resources/hostname
167
168         :param hostname: Full hrn of the node
169         :type hostname: str
170
171         """
172         return "/%s/%s/resources/%s" % (self._xmpp_root, self._slice, hostname)
173
174     @property
175     def _exp_session_id(self):
176         """ Return the Topic Name as /xmpp_root/slice/user
177
178         """
179         return "/%s/%s/%s" % (self._xmpp_root, self._slice, self._user)
180
181     @property
182     def _logger_session_id(self):
183         """ Return the Topic Name as /xmpp_root/slice/LOGGER
184
185         """
186         return "/%s/%s/%s/LOGGER" % (self._xmpp_root, self._slice, self._user)
187
188     def delete(self, hostname):
189         """ Delete the topic corresponding to the hostname for this session
190
191         :param hostname: Full hrn of the node
192         :type hostname: str
193
194         """
195         if not hostname in self._hostnames:
196             return
197
198         self._hostnames.remove(hostname)
199
200         xmpp_node = self._host_session_id(hostname)
201         self._client.delete(xmpp_node)
202
203     def enroll_host(self, hostname):
204         """ Create and Subscribe to the session topic and the resources corresponding to the hostname
205
206         :param hostname: Full hrn of the node
207         :type hostname: str
208
209         """
210         if hostname in self._hostnames:
211             return 
212
213         self._hostnames.append(hostname)
214
215         xmpp_node =  self._host_session_id(hostname)
216         self._client.create(xmpp_node)
217         self._client.subscribe(xmpp_node)
218
219         xmpp_node =  self._host_resource_id(hostname)
220         self._client.subscribe(xmpp_node)
221
222         payload = self._message.enroll_function("1", "*", "1", hostname)
223         self._client.publish(payload, xmpp_node)
224
225     def configure(self, hostname, attribute, value):
226         """ Configure attribute on the node
227
228         :param hostname: Full hrn of the node
229         :type hostname: str
230         :param attribute: Attribute that need to be configured (often written as /net/wX/attribute, with X the interface number)
231         :type attribute: str
232         :param value: Value of the attribute
233         :type value: str
234
235         """
236         payload = self._message.configure_function(hostname, value, attribute)
237         xmpp_node =  self._host_session_id(hostname)
238         self._client.publish(payload, xmpp_node)
239
240     def execute(self, hostname, app_id, arguments, path, env):
241         """ Execute command on the node
242
243         :param hostname: Full hrn of the node
244         :type hostname: str
245         :param app_id: Application Id (Any id that represents in a unique way the application)
246         :type app_id: str
247         :param arguments: Arguments of the application
248         :type arguments: str
249         :param path: Path of the application
250         :type path: str
251         :param env: Environnement values for the application
252         :type env: str
253
254         """
255         payload = self._message.execute_function(hostname, app_id, arguments, path, env)
256         xmpp_node =  self._host_session_id(hostname)
257         self._client.publish(payload, xmpp_node)
258
259     def exit(self, hostname, app_id):
260         """ Kill an application started with OMF
261
262         :param hostname: Full hrn of the node
263         :type hostname: str
264         :param app_id: Application Id of the application you want to stop
265         :type app_id: str
266
267         """
268         payload = self._message.exit_function(hostname, app_id)
269         xmpp_node =  self._host_session_id(hostname)
270         self._client.publish(payload, xmpp_node)
271
272     def release(self, hostname):
273         """ Delete the session and logger topics. Then disconnect 
274
275         """
276         if hostname in self._hostnames:
277             self.delete(hostname)
278
279     def disconnect(self) :
280         """ Delete the session and logger topics. Then disconnect 
281
282         """
283         self._client.delete(self._exp_session_id)
284         self._client.delete(self._logger_session_id)
285
286         time.sleep(1)
287         
288         # Wait the send queue to be empty before disconnect
289         self._client.disconnect(wait=True)
290         msg = " Disconnected from XMPP Server"
291         self.debug(msg)
292
293
294 class OMFAPIFactory(object):
295     """ 
296     .. note::
297
298         It allows the different RM to use the same xmpp client if they use the same credentials. 
299         For the moment, it is focused on Xmpp.
300
301     """
302     # use lock to avoid concurrent access to the Api list at the same times by 2 different threads
303     lock = threading.Lock()
304     _apis = dict()
305
306     @classmethod 
307     def get_api(cls, slice, host, port, password):
308         """ Get an OMF Api
309
310         :param slice: Xmpp Slice Name
311         :type slice: str
312         :param host: Xmpp Server Adress
313         :type host: str
314         :param port: Xmpp Port (Default : 5222)
315         :type port: str
316         :param password: Xmpp Password
317         :type password: str
318
319         """
320         if slice and host and port and password:
321             key = cls._make_key(slice, host, port, password)
322             cls.lock.acquire()
323             if key in cls._apis:
324                 #print "Api Counter : " + str(cls._apis[key]['cnt'])
325                 cls._apis[key]['cnt'] += 1
326                 cls.lock.release()
327                 return cls._apis[key]['api']
328             else :
329                 omf_api = cls.create_api(slice, host, port, password)
330                 cls.lock.release()
331                 return omf_api
332         return None
333
334     @classmethod 
335     def create_api(cls, slice, host, port, password):
336         """ Create an OMF API if this one doesn't exist yet with this credentials
337
338         :param slice: Xmpp Slice Name
339         :type slice: str
340         :param host: Xmpp Server Adress
341         :type host: str
342         :param port: Xmpp Port (Default : 5222)
343         :type port: str
344         :param password: Xmpp Password
345         :type password: str
346
347         """
348         omf_api = OMFAPI(slice, host, port, password)
349         key = cls._make_key(slice, host, port, password)
350         cls._apis[key] = {}
351         cls._apis[key]['api'] = omf_api
352         cls._apis[key]['cnt'] = 1
353         return omf_api
354
355     @classmethod 
356     def release_api(cls, slice, host, port, password):
357         """ Release an OMF API with this credentials
358
359         :param slice: Xmpp Slice Name
360         :type slice: str
361         :param host: Xmpp Server Adress
362         :type host: str
363         :param port: Xmpp Port (Default : 5222)
364         :type port: str
365         :param password: Xmpp Password
366         :type password: str
367
368         """
369         if slice and host and port and password:
370             key = cls._make_key(slice, host, port, password)
371             if key in cls._apis:
372                 cls._apis[key]['cnt'] -= 1
373                 #print "Api Counter : " + str(cls._apis[key]['cnt'])
374                 if cls._apis[key]['cnt'] == 0:
375                     omf_api = cls._apis[key]['api']
376                     omf_api.disconnect()
377
378
379     @classmethod 
380     def _make_key(cls, *args):
381         """ Hash the credentials in order to create a key
382
383         :param args: list of arguments used to create the hash (user, host, port, ...)
384         :type args: list of args
385
386         """
387         skey = "".join(map(str, args))
388         return hashlib.md5(skey).hexdigest()
389
390
391