Making runId as sub folder optional for the Collector RM
[nepi.git] / src / nepi / resources / omf / omf_client.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Julien Tribino <julien.tribino@inria.fr>
20
21 from nepi.util.logger import Logger
22
23 try:
24     import sleekxmpp
25     from sleekxmpp.exceptions import IqError, IqTimeout
26     class BaseOMFClient(sleekxmpp.ClientXMPP):
27         pass
28 except ImportError:
29     msg = ("SleekXMPP is not installed. Without this library "
30           "you will be not able to use OMF Resources "
31           "if you want to install SleekXmpp: \n"
32           " git clone -b develop git://github.com/fritzy/SleekXMPP.git \n"
33           " cd SleekXMPP \n"
34           " sudo python setup.py install\n")
35
36     logger = Logger("BaseOMFClient")
37     logger.debug(msg)
38
39     class BaseOMFClient(object):
40         pass
41
42 import traceback
43 import xml.etree.ElementTree as ET
44
45 # inherit from BaseXmpp and XMLstream classes
46 class OMFClient(BaseOMFClient, Logger): 
47     """
48     .. class:: Class Args :
49       
50         :param jid: Jabber Id (= Xmpp Slice + Date)
51         :type jid: str
52         :param password: Jabber Password (= Xmpp Password)
53         :type password: str
54
55     .. note::
56
57        This class is an XMPP Client with customized method
58
59     """
60
61     def __init__(self, jid, password):
62         """
63
64         :param jid: Jabber Id (= Xmpp Slice + Date)
65         :type jid: str
66         :param password: Jabber Password (= Xmpp Password)
67         :type password: str
68
69
70         """
71         Logger.__init__(self, "OMFClient")
72
73         sleekxmpp.ClientXMPP.__init__(self, jid, password)
74         self._ready = False
75         self._registered = False
76         self._server = None
77
78         self.register_plugin('xep_0077') # In-band registration
79         self.register_plugin('xep_0030')
80         self.register_plugin('xep_0059')
81         self.register_plugin('xep_0060') # PubSub 
82
83         self.add_event_handler("session_start", self.start)
84         self.add_event_handler("register", self.register)
85         self.add_event_handler("pubsub_publish", self.handle_omf_message)
86         
87     @property
88     def ready(self):
89         """ Check if the client is ready
90
91         """
92         return self._ready
93
94     def start(self, event):
95         """ Send presence to the Xmppp Server. This function is called directly by the sleekXmpp library
96
97         """
98         self.send_presence()
99         self._ready = True
100         self._server = "pubsub.%s" % self.boundjid.domain
101
102     def register(self, iq):
103         """  Register to the Xmppp Server. This function is called directly by the sleekXmpp library
104
105         """
106         if self._registered:
107             msg = " %s already registered!" % self.boundjid
108             self.info(msg)
109             return 
110
111         resp = self.Iq()
112         resp['type'] = 'set'
113         resp['register']['username'] = self.boundjid.user
114         resp['register']['password'] = self.password
115
116         try:
117             resp.send(now=True)
118             msg = " Account created for %s!" % self.boundjid
119             self.info(msg)
120             self._registered = True
121         except IqError as e:
122             msg = " Could not register account: %s" % e.iq['error']['text']
123             self.error(msg)
124         except IqTimeout:
125             msg = " No response from server."
126             self.error(msg)
127
128     def unregister(self):
129         """  Unregister from the Xmppp Server.
130
131         """
132         try:
133             self.plugin['xep_0077'].cancel_registration(
134                 ifrom=self.boundjid.full)
135             msg = " Account unregistered for %s!" % self.boundjid
136             self.info(msg)
137         except IqError as e:
138             msg = " Could not unregister account: %s" % e.iq['error']['text']
139             self.error(msg)
140         except IqTimeout:
141             msg = " No response from server."
142             self.error(msg)
143
144     def nodes(self):
145         """  Get all the nodes of the Xmppp Server.
146
147         """
148         try:
149             result = self['xep_0060'].get_nodes(self._server)
150             for item in result['disco_items']['items']:
151                 msg = ' - %s' % str(item)
152                 self.debug(msg)
153             return result
154         except:
155             error = traceback.format_exc()
156             msg = 'Could not retrieve node list.\ntraceback:\n%s' % error
157             self.error(msg)
158
159     def subscriptions(self):
160         """  Get all the subscriptions of the Xmppp Server.
161
162         """
163         try:
164             result = self['xep_0060'].get_subscriptions(self._server)
165                 #self.boundjid.full)
166             for node in result['node']:
167                 msg = ' - %s' % str(node)
168                 self.debug(msg)
169             return result
170         except:
171             error = traceback.format_exc()
172             msg = ' Could not retrieve subscriptions.\ntraceback:\n%s' % error
173             self.error(msg)
174
175     def create(self, node):
176         """  Create the topic corresponding to the node
177
178         :param node: Name of the topic, corresponding to the node (ex : omf.plexus.wlab17)
179         :type node: str
180
181         """
182         msg = " Create Topic : " + node
183         self.info(msg)
184    
185         config = self['xep_0004'].makeForm('submit')
186         config.add_field(var='pubsub#node_type', value='leaf')
187         config.add_field(var='pubsub#notify_retract', value='0')
188         config.add_field(var='pubsub#publish_model', value='open')
189         config.add_field(var='pubsub#persist_items', value='1')
190         config.add_field(var='pubsub#max_items', value='1')
191         config.add_field(var='pubsub#title', value=node)
192
193         try:
194             self['xep_0060'].create_node(self._server, node, config = config)
195         except:
196             error = traceback.format_exc()
197             msg = ' Could not create topic: %s\ntraceback:\n%s' % (node, error)
198             self.error(msg)
199
200     def delete(self, node):
201         """  Delete the topic corresponding to the node
202
203         :param node: Name of the topic, corresponding to the node (ex : omf.plexus.wlab17)
204         :type node: str
205
206         """
207         # To check if the queue are well empty at the end
208         #print " length of the queue : " + str(self.send_queue.qsize())
209         #print " length of the queue : " + str(self.event_queue.qsize())
210         try:
211             self['xep_0060'].delete_node(self._server, node)
212             msg = ' Deleted node: %s' % node
213             self.info(msg)
214         except:
215             error = traceback.format_exc()
216             msg = ' Could not delete topic: %s\ntraceback:\n%s' % (node, error)
217             self.error(msg)
218     
219     def publish(self, data, node):
220         """  Publish the data to the corresponding topic
221
222         :param data: Data that will be published
223         :type data: str
224         :param node: Name of the topic
225         :type node: str
226
227         """ 
228
229         msg = " Publish to Topic : " + node
230         self.info(msg)
231         try:
232             result = self['xep_0060'].publish(self._server,node,payload=data)
233             # id = result['pubsub']['publish']['item']['id']
234             # print('Published at item id: %s' % id)
235         except:
236             error = traceback.format_exc()
237             msg = ' Could not publish to: %s\ntraceback:\n%s' % (node, error)
238             self.error(msg)
239
240     def get(self, data):
241         """  Get the item
242
243         :param data: data from which the items will be get back
244         :type data: str
245
246
247         """
248         try:
249             result = self['xep_0060'].get_item(self._server, self.boundjid,
250                 data)
251             for item in result['pubsub']['items']['substanzas']:
252                 msg = 'Retrieved item %s: %s' % (item['id'], tostring(item['payload']))
253                 self.debug(msg)
254         except:
255             error = traceback.format_exc()
256             msg = ' Could not retrieve item %s from topic %s\ntraceback:\n%s' \
257                     % (data, self.boundjid, error)
258             self.error(msg)
259
260     def retract(self, data):
261         """  Retract the item
262
263         :param data: data from which the item will be retracted
264         :type data: str
265
266         """
267         try:
268             result = self['xep_0060'].retract(self._server, self.boundjid, data)
269             msg = ' Retracted item %s from topic %s' % (data, self.boundjid)
270             self.debug(msg)
271         except:
272             error = traceback.format_exc()
273             msg = 'Could not retract item %s from topic %s\ntraceback:\n%s' \
274                     % (data, self.boundjid, error)
275             self.error(msg)
276
277     def purge(self):
278         """  Purge the information in the server
279
280         """
281         try:
282             result = self['xep_0060'].purge(self._server, self.boundjid)
283             msg = ' Purged all items from topic %s' % self.boundjid
284             self.debug(msg)
285         except:
286             error = traceback.format_exc()
287             msg = ' Could not purge items from topic %s\ntraceback:\n%s' \
288                     % (self.boundjid, error)
289             self.error(msg)
290
291     def subscribe(self, node):
292         """ Subscribe to a topic
293
294         :param node: Name of the topic
295         :type node: str
296
297         """
298         try:
299             result = self['xep_0060'].subscribe(self._server, node)
300             msg = ' Subscribed %s to topic %s' \
301                     % (self.boundjid.user, node)
302             #self.info(msg)
303             self.debug(msg)
304         except:
305             error = traceback.format_exc()
306             msg = ' Could not subscribe %s to topic %s\ntraceback:\n%s' \
307                     % (self.boundjid.bare, node, error)
308             self.error(msg)
309
310     def unsubscribe(self, node):
311         """ Unsubscribe to a topic
312
313         :param node: Name of the topic
314         :type node: str
315
316         """
317         try:
318             result = self['xep_0060'].unsubscribe(self._server, node)
319             msg = ' Unsubscribed %s from topic %s' % (self.boundjid.bare, node)
320             self.debug(msg)
321         except:
322             error = traceback.format_exc()
323             msg = ' Could not unsubscribe %s from topic %s\ntraceback:\n%s' \
324                     % (self.boundjid.bare, node, error)
325             self.error(msg)
326
327     def _check_for_tag(self, root, namespaces, tag):
328         """  Check if an element markup is in the ElementTree
329
330         :param root: Root of the tree
331         :type root: ElementTree Element
332         :param namespaces: Namespaces of the element
333         :type namespaces: str
334         :param tag: Tag that will search in the tree
335         :type tag: str
336
337         """
338         for element in root.iter(namespaces+tag):
339             if element.text:
340                 return element
341             else : 
342                 return None    
343
344     def _check_output(self, root, namespaces):
345         """ Check the significative element in the answer and display it
346
347         :param root: Root of the tree
348         :type root: ElementTree Element
349         :param namespaces: Namespaces of the tree
350         :type namespaces: str
351
352         """
353         fields = ["TARGET", "REASON", "PATH", "APPID", "VALUE"]
354         response = ""
355         for elt in fields:
356             msg = self._check_for_tag(root, namespaces, elt)
357             if msg is not None:
358                 response = response + " " + msg.text + " :"
359         deb = self._check_for_tag(root, namespaces, "MESSAGE")
360         if deb is not None:
361             msg = response + " " + deb.text
362             self.debug(msg)
363         else :
364             self.info(response)
365
366     def handle_omf_message(self, iq):
367         """ Handle published/received message 
368
369         :param iq: Stanzas that is currently published/received
370         :type iq: Iq Stanza
371
372         """
373         namespaces = "{http://jabber.org/protocol/pubsub}"
374         for i in iq['pubsub_event']['items']:
375             root = ET.fromstring(str(i))
376             self._check_output(root, namespaces)
377
378