9654b68df83442e731bcb2e00bad845c6e0c782e
[nepi.git] / src / nepi / resources / omf / omf_api.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Julien Tribino <julien.tribino@inria.fr>
20
21 import datetime
22 import ssl
23 import sys
24 import time
25 import hashlib
26 import threading
27
28 from nepi.util.logger import Logger
29
30 from nepi.resources.omf.omf_client import OMFClient
31 from nepi.resources.omf.messages_5_4 import MessageHandler
32
33 class OMFAPI(Logger):
34     """
35     .. class:: Class Args :
36       
37         :param slice: Xmpp Slice
38         :type slice: str
39         :param host: Xmpp Server
40         :type host: str
41         :param port: Xmpp Port
42         :type port: str
43         :param password: Xmpp password
44         :type password: str
45         :param xmpp_root: Root of the Xmpp Topic Architecture
46         :type xmpp_root: str
47
48     .. note::
49
50        This class is the implementation of an OMF 5.4 API. Since the version 5.4.1, the Topic Architecture start with OMF_5.4 instead of OMF used for OMF5.3
51
52     """
53     def __init__(self, slice, host, port, password, xmpp_root = None):
54         """
55     
56         :param slice: Xmpp Slice
57         :type slice: str
58         :param host: Xmpp Server
59         :type host: str
60         :param port: Xmpp Port
61         :type port: str
62         :param password: Xmpp password
63         :type password: str
64         :param xmpp_root: Root of the Xmpp Topic Architecture
65         :type xmpp_root: str
66
67         """
68         super(OMFAPI, self).__init__("OMFAPI")
69         
70         date = datetime.datetime.now().strftime("%Y-%m-%dt%H.%M.%S")
71         tz = -time.altzone if time.daylight != 0 else -time.timezone
72         date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
73         self._user = "%s-%s" % (slice, date)
74         self._slice = slice
75         self._host = host
76         self._port = port
77         self._password = password
78         self._hostnames = []
79         self._xmpp_root = xmpp_root or "OMF_5.4"
80
81         # OMF xmpp client
82         self._client = None
83         # message handler
84         self._message = None
85
86         if sys.version_info < (3, 0):
87             reload(sys)
88             sys.setdefaultencoding('utf8')
89
90         # instantiate the xmpp client
91         self._init_client()
92
93         # register xmpp nodes for the experiment
94         self._enroll_experiment()
95         self._enroll_newexperiment()
96
97         # register xmpp logger for the experiment
98         self._enroll_logger()
99
100     def _init_client(self):
101         """ Initialize XMPP Client
102
103         """
104         jid = "%s@%s" % (self._user, self._host)
105         xmpp = OMFClient(jid, self._password)
106         # PROTOCOL_SSLv3 required for compatibility with OpenFire
107         xmpp.ssl_version = ssl.PROTOCOL_SSLv3
108
109         if xmpp.connect((self._host, self._port)):
110             xmpp.process(block=False)
111             while not xmpp.ready:
112                 time.sleep(1)
113             self._client = xmpp
114             self._message = MessageHandler(self._slice, self._user)
115         else:
116             msg = "Unable to connect to the XMPP server."
117             self.error(msg)
118             raise RuntimeError(msg)
119
120     def _enroll_experiment(self):
121         """ Create and Subscribe to the Session Topic
122
123         """
124         xmpp_node = self._exp_session_id
125         self._client.create(xmpp_node)
126         #print "Create experiment sesion id topics !!" 
127         self._client.subscribe(xmpp_node)
128         #print "Subscribe to experiment sesion id topics !!" 
129
130
131     def _enroll_newexperiment(self):
132         """ Publish New Experiment Message
133
134         """
135         address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice, self._user)
136         #print address
137         payload = self._message.newexp_function(self._user, address)
138         slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
139         self._client.publish(payload, slice_sid)
140
141     def _enroll_logger(self):
142         """ Create and Subscribe to the Logger Topic
143
144         """
145         xmpp_node = self._logger_session_id
146         self._client.create(xmpp_node)
147         self._client.subscribe(xmpp_node)
148
149         payload = self._message.log_function("2", 
150                 "nodeHandler::NodeHandler", 
151                 "INFO", 
152                 "OMF Experiment Controller 5.4 (git 529a626)")
153         self._client.publish(payload, xmpp_node)
154
155     def _host_session_id(self, hostname):
156         """ Return the Topic Name as /xmpp_root/slice/user/hostname
157
158         :param hostname: Full hrn of the node
159         :type hostname: str
160
161         """
162         return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, hostname)
163
164     def _host_resource_id(self, hostname):
165         """ Return the Topic Name as /xmpp_root/slice/resources/hostname
166
167         :param hostname: Full hrn of the node
168         :type hostname: str
169
170         """
171         return "/%s/%s/resources/%s" % (self._xmpp_root, self._slice, hostname)
172
173     @property
174     def _exp_session_id(self):
175         """ Return the Topic Name as /xmpp_root/slice/user
176
177         """
178         return "/%s/%s/%s" % (self._xmpp_root, self._slice, self._user)
179
180     @property
181     def _logger_session_id(self):
182         """ Return the Topic Name as /xmpp_root/slice/LOGGER
183
184         """
185         return "/%s/%s/%s/LOGGER" % (self._xmpp_root, self._slice, self._user)
186
187     def delete(self, hostname):
188         """ Delete the topic corresponding to the hostname for this session
189
190         :param hostname: Full hrn of the node
191         :type hostname: str
192
193         """
194         if not hostname in self._hostnames:
195             return
196
197         self._hostnames.remove(hostname)
198
199         xmpp_node = self._host_session_id(hostname)
200         self._client.delete(xmpp_node)
201
202     def enroll_host(self, hostname):
203         """ Create and Subscribe to the session topic and the resources corresponding to the hostname
204
205         :param hostname: Full hrn of the node
206         :type hostname: str
207
208         """
209         if hostname in self._hostnames:
210             return 
211
212         self._hostnames.append(hostname)
213
214         xmpp_node =  self._host_session_id(hostname)
215         self._client.create(xmpp_node)
216         self._client.subscribe(xmpp_node)
217
218         xmpp_node =  self._host_resource_id(hostname)
219         self._client.subscribe(xmpp_node)
220
221         payload = self._message.enroll_function("1", "*", "1", hostname)
222         self._client.publish(payload, xmpp_node)
223
224     def configure(self, hostname, attribute, value):
225         """ Configure attribute on the node
226
227         :param hostname: Full hrn of the node
228         :type hostname: str
229         :param attribute: Attribute that need to be configured (often written as /net/wX/attribute, with X the interface number)
230         :type attribute: str
231         :param value: Value of the attribute
232         :type value: str
233
234         """
235         payload = self._message.configure_function(hostname, value, attribute)
236         xmpp_node =  self._host_session_id(hostname)
237         self._client.publish(payload, xmpp_node)
238
239     def execute(self, hostname, app_id, arguments, path, env):
240         """ Execute command on the node
241
242         :param hostname: Full hrn of the node
243         :type hostname: str
244         :param app_id: Application Id (Any id that represents in a unique way the application)
245         :type app_id: str
246         :param arguments: Arguments of the application
247         :type arguments: str
248         :param path: Path of the application
249         :type path: str
250         :param env: Environnement values for the application
251         :type env: str
252
253         """
254         payload = self._message.execute_function(hostname, app_id, arguments, path, env)
255         xmpp_node =  self._host_session_id(hostname)
256         self._client.publish(payload, xmpp_node)
257
258     def exit(self, hostname, app_id):
259         """ Kill an application started with OMF
260
261         :param hostname: Full hrn of the node
262         :type hostname: str
263         :param app_id: Application Id of the application you want to stop
264         :type app_id: str
265
266         """
267         payload = self._message.exit_function(hostname, app_id)
268         xmpp_node =  self._host_session_id(hostname)
269         self._client.publish(payload, xmpp_node)
270
271     def release(self, hostname):
272         """ Delete the session and logger topics. Then disconnect 
273
274         """
275         if hostname in self._hostnames:
276             self.delete(hostname)
277
278     def disconnect(self) :
279         """ Delete the session and logger topics. Then disconnect 
280
281         """
282         self._client.delete(self._exp_session_id)
283         self._client.delete(self._logger_session_id)
284
285         time.sleep(1)
286         
287         # Wait the send queue to be empty before disconnect
288         self._client.disconnect(wait=True)
289         msg = " Disconnected from XMPP Server"
290         self.debug(msg)
291
292
293 class OMFAPIFactory(object):
294     """ 
295     .. note::
296
297         It allows the different RM to use the same xmpp client if they use the same credentials. 
298         For the moment, it is focused on Xmpp.
299
300     """
301     # use lock to avoid concurrent access to the Api list at the same times by 2 different threads
302     lock = threading.Lock()
303     _apis = dict()
304
305     @classmethod 
306     def get_api(cls, slice, host, port, password):
307         """ Get an OMF Api
308
309         :param slice: Xmpp Slice Name
310         :type slice: str
311         :param host: Xmpp Server Adress
312         :type host: str
313         :param port: Xmpp Port (Default : 5222)
314         :type port: str
315         :param password: Xmpp Password
316         :type password: str
317
318         """
319         if slice and host and port and password:
320             key = cls._make_key(slice, host, port, password)
321             cls.lock.acquire()
322             if key in cls._apis:
323                 #print "Api Counter : " + str(cls._apis[key]['cnt'])
324                 cls._apis[key]['cnt'] += 1
325                 cls.lock.release()
326                 return cls._apis[key]['api']
327             else :
328                 omf_api = cls.create_api(slice, host, port, password)
329                 cls.lock.release()
330                 return omf_api
331         return None
332
333     @classmethod 
334     def create_api(cls, slice, host, port, password):
335         """ Create an OMF API if this one doesn't exist yet with this credentials
336
337         :param slice: Xmpp Slice Name
338         :type slice: str
339         :param host: Xmpp Server Adress
340         :type host: str
341         :param port: Xmpp Port (Default : 5222)
342         :type port: str
343         :param password: Xmpp Password
344         :type password: str
345
346         """
347         omf_api = OMFAPI(slice, host, port, password)
348         key = cls._make_key(slice, host, port, password)
349         cls._apis[key] = {}
350         cls._apis[key]['api'] = omf_api
351         cls._apis[key]['cnt'] = 1
352         return omf_api
353
354     @classmethod 
355     def release_api(cls, slice, host, port, password):
356         """ Release an OMF API with this credentials
357
358         :param slice: Xmpp Slice Name
359         :type slice: str
360         :param host: Xmpp Server Adress
361         :type host: str
362         :param port: Xmpp Port (Default : 5222)
363         :type port: str
364         :param password: Xmpp Password
365         :type password: str
366
367         """
368         if slice and host and port and password:
369             key = cls._make_key(slice, host, port, password)
370             if key in cls._apis:
371                 cls._apis[key]['cnt'] -= 1
372                 #print "Api Counter : " + str(cls._apis[key]['cnt'])
373                 if cls._apis[key]['cnt'] == 0:
374                     omf_api = cls._apis[key]['api']
375                     omf_api.disconnect()
376
377
378     @classmethod 
379     def _make_key(cls, *args):
380         """ Hash the credentials in order to create a key
381
382         :param args: list of arguments used to create the hash (user, host, port, ...)
383         :type args: list of args
384
385         """
386         skey = "".join(map(str, args))
387         return hashlib.md5(skey).hexdigest()
388
389
390