2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 from sleekxmpp.exceptions import IqError, IqTimeout
24 import xml.etree.ElementTree as ET
28 # inherit from BaseXmpp and XMLStream classes
29 class OMFClient(sleekxmpp.ClientXMPP):
31 .. class:: Class Args :
33 :param jid: Jabber Id (= Xmpp Slice + Date)
35 :param password: Jabber Password (= Xmpp Password)
40 This class is an XMPP Client with customized method
44 def __init__(self, jid, password):
47 :param jid: Jabber Id (= Xmpp Slice + Date)
49 :param password: Jabber Password (= Xmpp Password)
54 sleekxmpp.ClientXMPP.__init__(self, jid, password)
56 self._registered = False
59 self.register_plugin('xep_0077') # In-band registration
60 self.register_plugin('xep_0030')
61 self.register_plugin('xep_0059')
62 self.register_plugin('xep_0060') # PubSub
64 self.add_event_handler("session_start", self.start)
65 self.add_event_handler("register", self.register)
66 self.add_event_handler("pubsub_publish", self.handle_omf_message)
68 self._logger = logging.getLogger("nepi.omf.xmppClient")
69 self._logger.setLevel(nepi.LOGLEVEL)
73 """ Check if the client is ready
78 def start(self, event):
79 """ Send presence to the Xmppp Server. This function is called directly by the sleekXmpp library
84 self._server = "pubsub.%s" % self.boundjid.domain
86 def register(self, iq):
87 """ Register to the Xmppp Server. This function is called directly by the sleekXmpp library
91 self._logger.info(" %s already registered!" % self.boundjid)
96 resp['register']['username'] = self.boundjid.user
97 resp['register']['password'] = self.password
101 self._logger.info(" Account created for %s!" % self.boundjid)
102 self._registered = True
104 self._logger.error(" Could not register account: %s" %
105 e.iq['error']['text'])
107 self._logger.error(" No response from server.")
109 def unregister(self):
110 """ Unregister from the Xmppp Server.
114 self.plugin['xep_0077'].cancel_registration(
115 ifrom=self.boundjid.full)
116 self._logger.info(" Account unregistered for %s!" % self.boundjid)
118 self._logger.error(" Could not unregister account: %s" %
119 e.iq['error']['text'])
121 self._logger.error(" No response from server.")
124 """ Get all the nodes of the Xmppp Server.
128 result = self['xep_0060'].get_nodes(self._server)
129 for item in result['disco_items']['items']:
130 self._logger.info(' - %s' % str(item))
133 error = traceback.format_exc()
134 self._logger.error(' Could not retrieve node list.\ntraceback:\n%s', error)
136 def subscriptions(self):
137 """ Get all the subscriptions of the Xmppp Server.
141 result = self['xep_0060'].get_subscriptions(self._server)
143 for node in result['node']:
144 self._logger.info(' - %s' % str(node))
147 error = traceback.format_exc()
148 self._logger.error(' Could not retrieve subscriptions.\ntraceback:\n%s', error)
150 def create(self, node):
151 """ Create the topic corresponding to the node
153 :param node: Name of the topic, corresponding to the node (ex : omf.plexus.wlab17)
157 self._logger.debug(" Create Topic : " + node)
159 config = self['xep_0004'].makeForm('submit')
160 config.add_field(var='pubsub#node_type', value='leaf')
161 config.add_field(var='pubsub#notify_retract', value='0')
162 config.add_field(var='pubsub#publish_model', value='open')
163 config.add_field(var='pubsub#persist_items', value='1')
164 config.add_field(var='pubsub#max_items', value='1')
165 config.add_field(var='pubsub#title', value=node)
168 self['xep_0060'].create_node(self._server, node, config = config)
170 error = traceback.format_exc()
171 self._logger.error(' Could not create topic: %s\ntraceback:\n%s' % (node, error))
173 def delete(self, node):
174 """ Delete the topic corresponding to the node
176 :param node: Name of the topic, corresponding to the node (ex : omf.plexus.wlab17)
180 # To check if the queue are well empty at the end
181 #print " length of the queue : " + str(self.send_queue.qsize())
182 #print " length of the queue : " + str(self.event_queue.qsize())
184 self['xep_0060'].delete_node(self._server, node)
185 self._logger.info(' Deleted node: %s' % node)
187 error = traceback.format_exc()
188 self._logger.error(' Could not delete topic: %s\ntraceback:\n%s' % (node, error))
190 def publish(self, data, node):
191 """ Publish the data to the corresponding topic
193 :param data: Data that will be published
195 :param node: Name of the topic
200 self._logger.debug(" Publish to Topic : " + node)
202 result = self['xep_0060'].publish(self._server,node,payload=data)
203 # id = result['pubsub']['publish']['item']['id']
204 # print('Published at item id: %s' % id)
206 error = traceback.format_exc()
207 self._logger.error(' Could not publish to: %s\ntraceback:\n%s' \
213 :param data: data from which the items will be get back
219 result = self['xep_0060'].get_item(self._server, self.boundjid,
221 for item in result['pubsub']['items']['substanzas']:
222 self._logger.info('Retrieved item %s: %s' % (item['id'],
223 tostring(item['payload'])))
225 error = traceback.format_exc()
226 self._logger.error(' Could not retrieve item %s from topic %s\ntraceback:\n%s' \
227 % (data, self.boundjid, error))
229 def retract(self, data):
232 :param data: data from which the item will be retracted
237 result = self['xep_0060'].retract(self._server, self.boundjid, data)
238 self._logger.info(' Retracted item %s from topic %s' % (data, self.boundjid))
240 error = traceback.format_exc()
241 self._logger.error(' Could not retract item %s from topic %s\ntraceback:\n%s' \
242 % (data, self.boundjid, error))
245 """ Purge the information in the server
249 result = self['xep_0060'].purge(self._server, self.boundjid)
250 self._logger.info(' Purged all items from topic %s' % self.boundjid)
252 error = traceback.format_exc()
253 self._logger.error(' Could not purge items from topic %s\ntraceback:\n%s' \
254 % (self.boundjid, error))
256 def subscribe(self, node):
257 """ Subscribe to a topic
259 :param node: Name of the topic
264 result = self['xep_0060'].subscribe(self._server, node)
265 #self._logger.debug('Subscribed %s to node %s' \
266 #% (self.boundjid.bare, node))
267 self._logger.info(' Subscribed %s to topic %s' \
268 % (self.boundjid.user, node))
270 error = traceback.format_exc()
271 self._logger.error(' Could not subscribe %s to topic %s\ntraceback:\n%s' \
272 % (self.boundjid.bare, node, error))
274 def unsubscribe(self, node):
275 """ Unsubscribe to a topic
277 :param node: Name of the topic
282 result = self['xep_0060'].unsubscribe(self._server, node)
283 self._logger.info(' Unsubscribed %s from topic %s' % (self.boundjid.bare, node))
285 error = traceback.format_exc()
286 self._logger.error(' Could not unsubscribe %s from topic %s\ntraceback:\n%s' \
287 % (self.boundjid.bare, node, error))
289 def _check_for_tag(self, root, namespaces, tag):
290 """ Check if an element markup is in the ElementTree
292 :param root: Root of the tree
293 :type root: ElementTree Element
294 :param namespaces: Namespaces of the element
295 :type namespaces: str
296 :param tag: Tag that will search in the tree
300 for element in root.iter(namespaces+tag):
306 def _check_output(self, root, namespaces):
307 """ Check the significative element in the answer and display it
309 :param root: Root of the tree
310 :type root: ElementTree Element
311 :param namespaces: Namespaces of the tree
312 :type namespaces: str
315 fields = ["TARGET", "REASON", "PATH", "APPID", "VALUE"]
318 msg = self._check_for_tag(root, namespaces, elt)
320 response = response + " " + msg.text + " :"
321 deb = self._check_for_tag(root, namespaces, "MESSAGE")
323 self._logger.debug(response + " " + deb.text)
325 self._logger.info(response)
327 def handle_omf_message(self, iq):
328 """ Handle published/received message
330 :param iq: Stanzas that is currently published/received
334 namespaces = "{http://jabber.org/protocol/pubsub}"
335 for i in iq['pubsub_event']['items']:
336 root = ET.fromstring(str(i))
337 self._check_output(root, namespaces)