Code cleanup. Setting resource state through specific functions
[nepi.git] / src / nepi / resources / omf / omf_api.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Julien Tribino <julien.tribino@inria.fr>
20
21 import ssl
22 import sys
23 import time
24 import hashlib
25 import threading
26
27 from nepi.util.logger import Logger
28
29 from nepi.resources.omf.omf_client import OMFClient
30 from nepi.resources.omf.messages_5_4 import MessageHandler
31
32 from nepi.util.timefuncs import tsformat 
33
34 class OMFAPI(Logger):
35     """
36     .. class:: Class Args :
37       
38         :param slice: Xmpp Slice
39         :type slice: str
40         :param host: Xmpp Server
41         :type host: str
42         :param port: Xmpp Port
43         :type port: str
44         :param password: Xmpp password
45         :type password: str
46         :param xmpp_root: Root of the Xmpp Topic Architecture
47         :type xmpp_root: str
48
49     .. note::
50
51        This class is the implementation of an OMF 5.4 API. 
52        Since the version 5.4.1, the Topic Architecture start with OMF_5.4 
53        instead of OMF used for OMF5.3
54
55     """
56     def __init__(self, slice, host, port, password, xmpp_root = None, 
57             exp_id = None):
58         """
59     
60         :param slice: Xmpp Slice
61         :type slice: str
62         :param host: Xmpp Server
63         :type host: str
64         :param port: Xmpp Port
65         :type port: str
66         :param password: Xmpp password
67         :type password: str
68         :param xmpp_root: Root of the Xmpp Topic Architecture
69         :type xmpp_root: str
70
71         """
72         super(OMFAPI, self).__init__("OMFAPI")
73         date = tsformat()
74         tz = -time.altzone if time.daylight != 0 else -time.timezone
75         date += "%+06.2f" % (tz / 3600) # timezone difference is in seconds
76         self._exp_id = exp_id or date
77         self._user = "%s-%s" % (slice, self._exp_id)
78         self._slice = slice
79         self._host = host
80         self._port = port
81         self._password = password
82         self._hostnames = []
83         self._xmpp_root = xmpp_root or "OMF_5.4"
84
85         # OMF xmpp client
86         self._client = None
87
88         # message handler
89         self._message = None
90
91         if sys.version_info < (3, 0):
92             reload(sys)
93             sys.setdefaultencoding('utf8')
94
95         # instantiate the xmpp client
96         self._init_client()
97
98         # register xmpp nodes for the experiment
99         self._enroll_experiment()
100         self._enroll_newexperiment()
101
102         # register xmpp logger for the experiment
103         self._enroll_logger()
104
105     def _init_client(self):
106         """ Initialize XMPP Client
107
108         """
109         jid = "%s@%s" % (self._user, self._host)
110         xmpp = OMFClient(jid, self._password)
111         # PROTOCOL_SSLv3 required for compatibility with OpenFire
112         xmpp.ssl_version = ssl.PROTOCOL_SSLv3
113
114         if xmpp.connect((self._host, self._port)):
115             xmpp.process(block=False)
116             while not xmpp.ready:
117                 time.sleep(1)
118             self._client = xmpp
119             self._message = MessageHandler(self._slice, self._user)
120         else:
121             msg = "Unable to connect to the XMPP server."
122             self.error(msg)
123             raise RuntimeError(msg)
124
125     def _enroll_experiment(self):
126         """ Create and Subscribe to the Session Topic
127
128         """
129         xmpp_node = self._exp_session_id
130         self._client.create(xmpp_node)
131         #print "Create experiment sesion id topics !!" 
132         self._client.subscribe(xmpp_node)
133         #print "Subscribe to experiment sesion id topics !!" 
134
135
136     def _enroll_newexperiment(self):
137         """ Publish New Experiment Message
138
139         """
140         address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice,
141                 self._user)
142         #print address
143         payload = self._message.newexp_function(self._user, address)
144         slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
145         self._client.publish(payload, slice_sid)
146
147     def _enroll_logger(self):
148         """ Create and Subscribe to the Logger Topic
149
150         """
151         xmpp_node = self._logger_session_id
152         self._client.create(xmpp_node)
153         self._client.subscribe(xmpp_node)
154
155         payload = self._message.log_function("2", 
156                 "nodeHandler::NodeHandler", 
157                 "INFO", 
158                 "OMF Experiment Controller 5.4 (git 529a626)")
159         self._client.publish(payload, xmpp_node)
160
161     def _host_session_id(self, hostname):
162         """ Return the Topic Name as /xmpp_root/slice/user/hostname
163
164         :param hostname: Full hrn of the node
165         :type hostname: str
166
167         """
168         return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, 
169                 hostname)
170
171     def _host_resource_id(self, hostname):
172         """ Return the Topic Name as /xmpp_root/slice/resources/hostname
173
174         :param hostname: Full hrn of the node
175         :type hostname: str
176
177         """
178         return "/%s/%s/resources/%s" % (self._xmpp_root, self._slice, hostname)
179
180     @property
181     def _exp_session_id(self):
182         """ Return the Topic Name as /xmpp_root/slice/user
183
184         """
185         return "/%s/%s/%s" % (self._xmpp_root, self._slice, self._user)
186
187     @property
188     def _logger_session_id(self):
189         """ Return the Topic Name as /xmpp_root/slice/LOGGER
190
191         """
192         return "/%s/%s/%s/LOGGER" % (self._xmpp_root, self._slice, self._user)
193
194     def delete(self, hostname):
195         """ Delete the topic corresponding to the hostname for this session
196
197         :param hostname: Full hrn of the node
198         :type hostname: str
199
200         """
201         if not hostname in self._hostnames:
202             return
203
204         self._hostnames.remove(hostname)
205
206         xmpp_node = self._host_session_id(hostname)
207         self._client.delete(xmpp_node)
208
209     def enroll_host(self, hostname):
210         """ Create and Subscribe to the session topic and the resources
211             corresponding to the hostname
212
213         :param hostname: Full hrn of the node
214         :type hostname: str
215
216         """
217         if hostname in self._hostnames:
218             return 
219
220         self._hostnames.append(hostname)
221
222         xmpp_node =  self._host_session_id(hostname)
223         self._client.create(xmpp_node)
224         self._client.subscribe(xmpp_node)
225
226         xmpp_node =  self._host_resource_id(hostname)
227         self._client.subscribe(xmpp_node)
228
229         payload = self._message.enroll_function("1", "*", "1", hostname)
230         self._client.publish(payload, xmpp_node)
231
232     def configure(self, hostname, attribute, value):
233         """ Configure attribute on the node
234
235         :param hostname: Full hrn of the node
236         :type hostname: str
237         :param attribute: Attribute that need to be configured (
238             often written as /net/wX/attribute, with X the interface number)
239         :type attribute: str
240         :param value: Value of the attribute
241         :type value: str
242
243         """
244         payload = self._message.configure_function(hostname, value, attribute)
245         xmpp_node =  self._host_session_id(hostname)
246         self._client.publish(payload, xmpp_node)
247
248     def execute(self, hostname, app_id, arguments, path, env):
249         """ Execute command on the node
250
251         :param hostname: Full hrn of the node
252         :type hostname: str
253         :param app_id: Application Id (Any id that represents in a unique 
254             way the application)
255         :type app_id: str
256         :param arguments: Arguments of the application
257         :type arguments: str
258         :param path: Path of the application
259         :type path: str
260         :param env: Environnement values for the application
261         :type env: str
262
263         """
264         payload = self._message.execute_function(hostname, app_id, arguments, 
265                 path, env)
266         xmpp_node =  self._host_session_id(hostname)
267         self._client.publish(payload, xmpp_node)
268
269     def exit(self, hostname, app_id):
270         """ Kill an application started with OMF
271
272         :param hostname: Full hrn of the node
273         :type hostname: str
274         :param app_id: Application Id of the application you want to stop
275         :type app_id: str
276
277         """
278         payload = self._message.exit_function(hostname, app_id)
279         xmpp_node =  self._host_session_id(hostname)
280         self._client.publish(payload, xmpp_node)
281
282     def release(self, hostname):
283         """ Delete the session and logger topics. Then disconnect 
284
285         """
286         if hostname in self._hostnames:
287             self.delete(hostname)
288
289     def disconnect(self) :
290         """ Delete the session and logger topics. Then disconnect 
291
292         """
293         self._client.delete(self._exp_session_id)
294         self._client.delete(self._logger_session_id)
295
296         time.sleep(1)
297         
298         # Wait the send queue to be empty before disconnect
299         self._client.disconnect(wait=True)
300         msg = " Disconnected from XMPP Server"
301         self.debug(msg)
302
303
304 class OMFAPIFactory(object):
305     """ 
306     .. note::
307
308         It allows the different RM to use the same xmpp client if they use 
309         the same credentials.  For the moment, it is focused on XMPP.
310
311     """
312     # use lock to avoid concurrent access to the Api list at the same times by 2 
313     # different threads
314     lock = threading.Lock()
315     _apis = dict()
316
317     @classmethod 
318     def get_api(cls, slice, host, port, password, exp_id = None):
319         """ Get an OMF Api
320
321         :param slice: Xmpp Slice Name
322         :type slice: str
323         :param host: Xmpp Server Adress
324         :type host: str
325         :param port: Xmpp Port (Default : 5222)
326         :type port: str
327         :param password: Xmpp Password
328         :type password: str
329
330         """
331         if slice and host and port and password:
332             key = cls._make_key(slice, host, port, password, exp_id)
333             cls.lock.acquire()
334             if key in cls._apis:
335                 #print "Api Counter : " + str(cls._apis[key]['cnt'])
336                 cls._apis[key]['cnt'] += 1
337                 cls.lock.release()
338                 return cls._apis[key]['api']
339             else :
340                 omf_api = cls.create_api(slice, host, port, password, exp_id)
341                 cls.lock.release()
342                 return omf_api
343         return None
344
345     @classmethod 
346     def create_api(cls, slice, host, port, password, exp_id):
347         """ Create an OMF API if this one doesn't exist yet with this credentials
348
349         :param slice: Xmpp Slice Name
350         :type slice: str
351         :param host: Xmpp Server Adress
352         :type host: str
353         :param port: Xmpp Port (Default : 5222)
354         :type port: str
355         :param password: Xmpp Password
356         :type password: str
357
358         """
359         omf_api = OMFAPI(slice, host, port, password, exp_id = exp_id)
360         key = cls._make_key(slice, host, port, password, exp_id)
361         cls._apis[key] = {}
362         cls._apis[key]['api'] = omf_api
363         cls._apis[key]['cnt'] = 1
364         return omf_api
365
366     @classmethod 
367     def release_api(cls, slice, host, port, password, exp_id = None):
368         """ Release an OMF API with this credentials
369
370         :param slice: Xmpp Slice Name
371         :type slice: str
372         :param host: Xmpp Server Adress
373         :type host: str
374         :param port: Xmpp Port (Default : 5222)
375         :type port: str
376         :param password: Xmpp Password
377         :type password: str
378
379         """
380         if slice and host and port and password:
381             key = cls._make_key(slice, host, port, password, exp_id)
382             if key in cls._apis:
383                 cls._apis[key]['cnt'] -= 1
384                 #print "Api Counter : " + str(cls._apis[key]['cnt'])
385                 if cls._apis[key]['cnt'] == 0:
386                     omf_api = cls._apis[key]['api']
387                     omf_api.disconnect()
388
389
390     @classmethod 
391     def _make_key(cls, *args):
392         """ Hash the credentials in order to create a key
393
394         :param args: list of arguments used to create the hash (user, host, port, ...)
395         :type args: list of args
396
397         """
398         skey = "".join(map(str, args))
399         return hashlib.md5(skey).hexdigest()
400
401
402