Making runId as sub folder optional for the Collector RM
[nepi.git] / src / nepi / resources / omf / omf_api.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Julien Tribino <julien.tribino@inria.fr>
20
21 import ssl
22 import sys
23 import time
24 import hashlib
25 import threading
26
27 from nepi.util.logger import Logger
28
29 from nepi.resources.omf.omf_client import OMFClient
30 from nepi.resources.omf.messages_5_4 import MessageHandler
31
32 class OMFAPI(Logger):
33     """
34     .. class:: Class Args :
35       
36         :param slice: Xmpp Slice
37         :type slice: str
38         :param host: Xmpp Server
39         :type host: str
40         :param port: Xmpp Port
41         :type port: str
42         :param password: Xmpp password
43         :type password: str
44         :param xmpp_root: Root of the Xmpp Topic Architecture
45         :type xmpp_root: str
46
47     .. note::
48
49        This class is the implementation of an OMF 5.4 API. 
50        Since the version 5.4.1, the Topic Architecture start with OMF_5.4 
51        instead of OMF used for OMF5.3
52
53     """
54     def __init__(self, slice, host, port, password, xmpp_root = None, 
55             exp_id = None):
56         """
57     
58         :param slice: Xmpp Slice
59         :type slice: str
60         :param host: Xmpp Server
61         :type host: str
62         :param port: Xmpp Port
63         :type port: str
64         :param password: Xmpp password
65         :type password: str
66         :param xmpp_root: Root of the Xmpp Topic Architecture
67         :type xmpp_root: str
68
69         """
70         super(OMFAPI, self).__init__("OMFAPI")
71         self._exp_id = exp_id 
72         self._user = "%s-%s" % (slice, self._exp_id)
73         self._slice = slice
74         self._host = host
75         self._port = port
76         self._password = password
77         self._hostnames = []
78         self._xmpp_root = xmpp_root or "OMF_5.4"
79
80         # OMF xmpp client
81         self._client = None
82
83         # message handler
84         self._message = None
85
86         if sys.version_info < (3, 0):
87             reload(sys)
88             sys.setdefaultencoding('utf8')
89
90         # instantiate the xmpp client
91         self._init_client()
92
93         # register xmpp nodes for the experiment
94         self._enroll_experiment()
95         self._enroll_newexperiment()
96
97         # register xmpp logger for the experiment
98         self._enroll_logger()
99
100     def _init_client(self):
101         """ Initialize XMPP Client
102
103         """
104         jid = "%s@%s" % (self._user, self._host)
105         xmpp = OMFClient(jid, self._password)
106         # PROTOCOL_SSLv3 required for compatibility with OpenFire
107         xmpp.ssl_version = ssl.PROTOCOL_SSLv3
108
109         if xmpp.connect((self._host, self._port)):
110             xmpp.process(block=False)
111             while not xmpp.ready:
112                 time.sleep(1)
113             self._client = xmpp
114             self._message = MessageHandler(self._slice, self._user)
115         else:
116             msg = "Unable to connect to the XMPP server."
117             self.error(msg)
118             raise RuntimeError(msg)
119
120     def _enroll_experiment(self):
121         """ Create and Subscribe to the Session Topic
122
123         """
124         xmpp_node = self._exp_session_id
125         self._client.create(xmpp_node)
126         #print "Create experiment sesion id topics !!" 
127         self._client.subscribe(xmpp_node)
128         #print "Subscribe to experiment sesion id topics !!" 
129
130
131     def _enroll_newexperiment(self):
132         """ Publish New Experiment Message
133
134         """
135         address = "/%s/%s/%s/%s" % (self._host, self._xmpp_root, self._slice,
136                 self._user)
137         #print address
138         payload = self._message.newexp_function(self._user, address)
139         slice_sid = "/%s/%s" % (self._xmpp_root, self._slice)
140         self._client.publish(payload, slice_sid)
141
142     def _enroll_logger(self):
143         """ Create and Subscribe to the Logger Topic
144
145         """
146         xmpp_node = self._logger_session_id
147         self._client.create(xmpp_node)
148         self._client.subscribe(xmpp_node)
149
150         payload = self._message.log_function("2", 
151                 "nodeHandler::NodeHandler", 
152                 "INFO", 
153                 "OMF Experiment Controller 5.4 (git 529a626)")
154         self._client.publish(payload, xmpp_node)
155
156     def _host_session_id(self, hostname):
157         """ Return the Topic Name as /xmpp_root/slice/user/hostname
158
159         :param hostname: Full hrn of the node
160         :type hostname: str
161
162         """
163         return "/%s/%s/%s/%s" % (self._xmpp_root, self._slice, self._user, 
164                 hostname)
165
166     def _host_resource_id(self, hostname):
167         """ Return the Topic Name as /xmpp_root/slice/resources/hostname
168
169         :param hostname: Full hrn of the node
170         :type hostname: str
171
172         """
173         return "/%s/%s/resources/%s" % (self._xmpp_root, self._slice, hostname)
174
175     @property
176     def _exp_session_id(self):
177         """ Return the Topic Name as /xmpp_root/slice/user
178
179         """
180         return "/%s/%s/%s" % (self._xmpp_root, self._slice, self._user)
181
182     @property
183     def _logger_session_id(self):
184         """ Return the Topic Name as /xmpp_root/slice/LOGGER
185
186         """
187         return "/%s/%s/%s/LOGGER" % (self._xmpp_root, self._slice, self._user)
188
189     def delete(self, hostname):
190         """ Delete the topic corresponding to the hostname for this session
191
192         :param hostname: Full hrn of the node
193         :type hostname: str
194
195         """
196         if not hostname in self._hostnames:
197             return
198
199         self._hostnames.remove(hostname)
200
201         xmpp_node = self._host_session_id(hostname)
202         self._client.delete(xmpp_node)
203
204     def enroll_host(self, hostname):
205         """ Create and Subscribe to the session topic and the resources
206             corresponding to the hostname
207
208         :param hostname: Full hrn of the node
209         :type hostname: str
210
211         """
212         if hostname in self._hostnames:
213             return 
214
215         self._hostnames.append(hostname)
216
217         xmpp_node =  self._host_session_id(hostname)
218         self._client.create(xmpp_node)
219         self._client.subscribe(xmpp_node)
220
221         xmpp_node =  self._host_resource_id(hostname)
222         self._client.subscribe(xmpp_node)
223
224         payload = self._message.enroll_function("1", "*", "1", hostname)
225         self._client.publish(payload, xmpp_node)
226
227     def configure(self, hostname, attribute, value):
228         """ Configure attribute on the node
229
230         :param hostname: Full hrn of the node
231         :type hostname: str
232         :param attribute: Attribute that need to be configured (
233             often written as /net/wX/attribute, with X the interface number)
234         :type attribute: str
235         :param value: Value of the attribute
236         :type value: str
237
238         """
239         
240         payload = self._message.configure_function(hostname, value, attribute)
241         xmpp_node =  self._host_session_id(hostname)
242         self._client.publish(payload, xmpp_node)
243
244     
245     def send_stdin(self, hostname, value, app_id):
246         """ Send to the stdin of the application the value
247
248         :param hostname: Full hrn of the node
249         :type hostname: str
250         :param appid: Application Id (Any id that represents in a unique 
251             way the application)
252         :type appid: str
253         :param value: parameter to execute in the stdin of the application
254         :type value: str
255
256         """
257         payload = self._message.stdin_function(hostname, value, app_id)
258         xmpp_node =  self._host_session_id(hostname)
259         self._client.publish(payload, xmpp_node)
260
261
262     def execute(self, hostname, app_id, arguments, path, env):
263         """ Execute command on the node
264
265         :param hostname: Full hrn of the node
266         :type hostname: str
267         :param app_id: Application Id (Any id that represents in a unique 
268             way the application)
269         :type app_id: str
270         :param arguments: Arguments of the application
271         :type arguments: str
272         :param path: Path of the application
273         :type path: str
274         :param env: Environnement values for the application
275         :type env: str
276
277         """
278         payload = self._message.execute_function(hostname, app_id, arguments, 
279                 path, env)
280         xmpp_node =  self._host_session_id(hostname)
281         self._client.publish(payload, xmpp_node)
282
283     def exit(self, hostname, app_id):
284         """ Kill an application started with OMF
285
286         :param hostname: Full hrn of the node
287         :type hostname: str
288         :param app_id: Application Id of the application you want to stop
289         :type app_id: str
290
291         """
292         payload = self._message.exit_function(hostname, app_id)
293         xmpp_node =  self._host_session_id(hostname)
294         self._client.publish(payload, xmpp_node)
295
296     def release(self, hostname):
297         """ Delete the session and logger topics. Then disconnect 
298
299         """
300         if hostname in self._hostnames:
301             self.delete(hostname)
302
303     def disconnect(self) :
304         """ Delete the session and logger topics. Then disconnect 
305
306         """
307         self._client.delete(self._exp_session_id)
308         self._client.delete(self._logger_session_id)
309
310         time.sleep(1)
311         
312         # Wait the send queue to be empty before disconnect
313         self._client.disconnect(wait=True)
314         msg = " Disconnected from XMPP Server"
315         self.debug(msg)
316
317
318 class OMFAPIFactory(object):
319     """ 
320     .. note::
321
322         It allows the different RM to use the same xmpp client if they use 
323         the same credentials.  For the moment, it is focused on XMPP.
324
325     """
326     # use lock to avoid concurrent access to the Api list at the same times by 2 
327     # different threads
328     lock = threading.Lock()
329     _apis = dict()
330
331     @classmethod 
332     def get_api(cls, slice, host, port, password, exp_id = None):
333         """ Get an OMF Api
334
335         :param slice: Xmpp Slice Name
336         :type slice: str
337         :param host: Xmpp Server Adress
338         :type host: str
339         :param port: Xmpp Port (Default : 5222)
340         :type port: str
341         :param password: Xmpp Password
342         :type password: str
343
344         """
345         if slice and host and port and password:
346             key = cls._make_key(slice, host, port, password, exp_id)
347             cls.lock.acquire()
348             if key in cls._apis:
349                 #print "Api Counter : " + str(cls._apis[key]['cnt'])
350                 cls._apis[key]['cnt'] += 1
351                 cls.lock.release()
352                 return cls._apis[key]['api']
353             else :
354                 omf_api = cls.create_api(slice, host, port, password, exp_id)
355                 cls.lock.release()
356                 return omf_api
357         return None
358
359     @classmethod 
360     def create_api(cls, slice, host, port, password, exp_id):
361         """ Create an OMF API if this one doesn't exist yet with this credentials
362
363         :param slice: Xmpp Slice Name
364         :type slice: str
365         :param host: Xmpp Server Adress
366         :type host: str
367         :param port: Xmpp Port (Default : 5222)
368         :type port: str
369         :param password: Xmpp Password
370         :type password: str
371
372         """
373         omf_api = OMFAPI(slice, host, port, password, exp_id = exp_id)
374         key = cls._make_key(slice, host, port, password, exp_id)
375         cls._apis[key] = {}
376         cls._apis[key]['api'] = omf_api
377         cls._apis[key]['cnt'] = 1
378         return omf_api
379
380     @classmethod 
381     def release_api(cls, slice, host, port, password, exp_id = None):
382         """ Release an OMF API with this credentials
383
384         :param slice: Xmpp Slice Name
385         :type slice: str
386         :param host: Xmpp Server Adress
387         :type host: str
388         :param port: Xmpp Port (Default : 5222)
389         :type port: str
390         :param password: Xmpp Password
391         :type password: str
392
393         """
394         if slice and host and port and password:
395             key = cls._make_key(slice, host, port, password, exp_id)
396             if key in cls._apis:
397                 cls._apis[key]['cnt'] -= 1
398                 #print "Api Counter : " + str(cls._apis[key]['cnt'])
399                 if cls._apis[key]['cnt'] == 0:
400                     omf_api = cls._apis[key]['api']
401                     omf_api.disconnect()
402
403
404     @classmethod 
405     def _make_key(cls, *args):
406         """ Hash the credentials in order to create a key
407
408         :param args: list of arguments used to create the hash (user, host, port, ...)
409         :type args: list of args
410
411         """
412         skey = "".join(map(str, args))
413         return hashlib.md5(skey).hexdigest()
414
415
416