2 # Baris Metin <tmetin@sophia.inria.fr>
7 from twisted.words.xish import domish
8 from twisted.web import xmlrpc, server
9 from twisted.internet import reactor, task
10 from twisted.words.protocols.jabber import xmlstream, client, jid
12 sys.path.append("/usr/share/plc_api/")
13 from PLC.Config import Config
16 class BaseClient(object):
17 """ Base XMPP client: handles authentication and basic presence/message requests. """
18 def __init__(self, id, secret, verbose = False, log = None):
20 if isinstance(id, (str, unicode)):
22 x = client.XMPPClientFactory(id, secret)
23 x.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.event_connected)
24 x.addBootstrap(xmlstream.STREAM_END_EVENT, self.event_disconnected)
25 x.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.event_init_failed)
26 x.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.event_authenticated)
29 self.verbose = verbose
30 self.log = log or sys.stdout
32 def __rawDataIN(self, buf):
33 if self.verbose: self.msg("RECV: %s" % buf)
35 def __rawDataOUT(self, buf):
36 if self.verbose: self.msg("SEND: %s" % buf)
39 self.log.write("%s\n" % msg)
43 self.msg("ERROR: %s" % msg)
46 self.msg("WARN: %s" % msg)
49 self.msg("INFO: %s" % msg)
51 def event_connected(self, xs):
53 xs.rawDataInFn = self.__rawDataIN
54 xs.rawDataOutFn = self.__rawDataOUT
57 def event_disconnected(self, xs):
60 def event_init_failed(self, xs):
61 self.error("Init Failed")
63 def event_authenticated(self, xs):
64 presence = domish.Element(("jabber:client", "presence"))
65 presence.addElement("show", content="dnd")
66 presence.addElement("status", content="man at work")
69 # add protocol handlers
70 xs.addObserver("/presence[@type='subscribe']", self.presence_subscribe)
71 xs.addObserver("/presence[@type='unsubscribe']", self.presence_unsubscribe)
72 xs.addObserver("/presence", self.presence)
73 xs.addObserver("/message[@type='chat']", self.message_chat)
75 def presence_subscribe(self, m):
76 self.info("%s request to add us, granting." % m['from'])
77 p = domish.Element(("jabber:client", "presence"))
78 p['from'], p['to'] = m['to'], m['from']
79 p['type'] = "subscribed"
80 self.xmlstream.send(p)
82 def presence_unsubscribe(self, m):
84 self.info("%s removed us, trying to re-authenticate." % m['from'])
85 p = domish.Element(("jabber:client", "presence"))
86 p['from'], p['to'] = m['to'], m['from']
87 p['type'] = "subscribe"
88 self.xmlstream.send(p)
90 def presence(self, m):
91 p = domish.Element(("jabber:client", "presence"))
92 p['from'], p['to'] = m['to'], m['from']
93 # initially read presence.addElement, my wild guess.. -- Thierry
94 p.addElement("show", content="dnd")
95 p.addElement("status", content="man at work")
96 self.xmlstream.send(p)
98 def message_chat(self, m):
99 n = domish.Element((None, "message"))
101 n['from'] = self.id.full()
102 n.addElement("body", content = "don't have time to chat. working!")
103 self.xmlstream.send(n)
106 class PubSubClient(BaseClient):
107 """ PubSub (XEP 0060) implementation """
109 def __init__(self, id, secret, verbose = False, log = None):
110 BaseClient.__init__(self, id, secret, verbose = verbose, log = log)
113 def add_result_hook(self, hook_to, hook):
114 self.hooks[hook_to] = hook
116 def delete_result_hook(self, hook_to):
117 if self.hooks.has_key(hook_to):
118 del self.hooks[hook_to]
120 def event_authenticated(self, xs):
121 BaseClient.event_authenticated(self, xs)
123 xs.addObserver("/iq/pubsub/create", self.result_create_node)
124 xs.addObserver("/iq/pubsub/delete", self.result_delete_node)
125 xs.addObserver("/iq/query[@xmlns='http://jabber.org/protocol/disco#items']", self.result_discover)
126 xs.addObserver("/iq/pubsub/subscription[@subscription='subscribed']", self.result_subscribe_to_node)
127 xs.addObserver("/iq/pubsub/configure/x", self.result_configure_node)
128 xs.addObserver("/iq/pubsub/configure/error", self.result_configure_node)
130 def __iq(self, t="get"):
131 iq = domish.Element((None, "iq"))
132 iq['from'] = self.id.full()
133 iq['to'] = "pubsub.%s" % self.id.host
138 def __add_pubsub(self, iq):
139 pubsub = iq.addElement("pubsub")
140 pubsub['xmlns'] = "http://jabber.org/protocol/pubsub"
144 def discover(self, node = None):
145 iq = self.__iq("get")
146 query = iq.addElement("query")
147 query['xmlns'] = "http://jabber.org/protocol/disco#items"
150 self.requests[iq['id']] = node
151 self.xmlstream.send(iq)
153 def result_discover(self, iq):
154 hook = self.hooks.get('discover', None)
157 self.delete_result_hook('discover')
159 self.requests.pop(iq['id'])
162 def subscribe_to_node(self, node):
163 iq = self.__iq("set")
164 pubsub = self.__add_pubsub(iq)
165 subscribe = pubsub.addElement("subscribe")
166 subscribe['node'] = node
167 subscribe['jid'] = self.id.full()
168 self.requests[iq['id']] = node
169 self.xmlstream.send(iq)
171 def result_subscribe_to_node(self, iq):
172 self.requests.pop(iq['id'])
175 def publish_to_node(self, node, payload):
176 iq = self.__iq("set")
177 pubsub = self.__add_pubsub(iq)
178 publish = pubsub.addElement("publish")
179 publish['node'] = node
180 items = publish.addElement("item", content=payload)
181 self.requests[iq['id']] = node
182 self.xmlstream.send(iq)
184 def result_publish_to_node(self, iq):
185 self.requests.pop(iq['id'])
188 # TODO: ejabberd doesn't have the node configuration feature implmented yet!
189 def configure_node(self, node, fields=None):
190 iq = self.__iq("set")
191 pubsub = self.__add_pubsub(iq)
192 configure = pubsub.addElement("configure")
193 configure['node'] = node
197 self.requests[iq['id']] = node
198 self.xmlstream.send(iq)
200 def result_configure_node(self, iq):
201 hook = self.hooks.get('configure', None)
204 self.delete_result_hook('configure')
206 self.requests.pop(iq['id'])
209 def create_node(self, node = None):
210 iq = self.__iq("set")
211 pubsub = self.__add_pubsub(iq)
212 create = pubsub.addElement("create")
214 create['node'] = node
215 configure = pubsub.addElement("configure")
216 self.requests[iq['id']] = node
217 self.xmlstream.send(iq)
219 def result_create_node(self, iq):
220 node = self.requests[iq['id']]
222 if iq.error.conflict:
223 # node is already there, nothing important.
224 self.warn("NodeID exists: %s" % node)
228 err_type = iq.error['type']
229 self.error("Can not create node: %s (error type: %s)" % (node, err_type))
232 # try subscribing to the node for debugging purposes
233 self.subscribe_to_node(node)
235 self.requests.pop(iq['id'])
238 def delete_node(self, node):
239 iq = self.__iq("set")
240 pubsub = self.__add_pubsub(iq)
241 delete = pubsub.addElement("delete")
242 delete['node'] = node
243 self.requests[iq['id']] = node
244 self.xmlstream.send(iq)
246 def result_delete_node(self, iq):
247 self.requests.pop(iq['id'])
249 def message_chat(self, m):
251 for e in m.elements():
257 # node = m.event.items['node']
258 # n = domish.Element((None, "message"))
259 # n.addElement("body", content = "published to: %s\n%s" % (node, m.event.items.toXml()))
260 # # for each listener in promiscuous mode send the published event
261 # self.xmlstream.send(n)
264 # # not a pubsub message continue on
267 if body == "list groups":
270 for i in iq.query.elements():
271 reply += "%s\n" % i['node']
272 n = domish.Element((None, "message"))
274 n['from'] = self.id.full()
275 n.addElement("body", content = reply)
276 self.xmlstream.send(n)
278 self.add_result_hook("discover", list_groups)
281 elif body.startswith("configuration"):
282 # "configuration NODE"
285 node = body.split()[1].strip()
289 def get_configuration(iq):
291 n = domish.Element((None, "message"))
293 n['from'] = self.id.full()
294 n.addElement("body", content = reply)
295 self.xmlstream.send(n)
297 self.add_result_hook("configure", get_configuration)
298 self.configure_node(node)
301 BaseClient.message_chat(self, m)
304 class Slicemgr(xmlrpc.XMLRPC, PubSubClient):
307 RESOURCES = 'resources'
309 def __init__(self, id, secret, verbose = False, log = None):
310 xmlrpc.XMLRPC.__init__(self, allowNone=True)
311 PubSubClient.__init__(self, id, secret, verbose = verbose, log = log)
312 self.command_queue = Queue.Queue()
314 xmlrpc.addIntrospection(self)
316 def xmlrpc_createSlice(self, slice):
317 self.create_slice(slice)
319 def xmlrpc_addResource(self, slice, resource):
320 self.add_resource(slice, resource)
322 def xmlrpc_deleteSlice(self, slice):
323 self.delete_slice(slice)
325 def xmlrpc_removeResource(self, slice, resource):
326 self.delete_resource(slice, resource)
329 def flush_commands(self):
330 # self.info("Running %d commands" % self.command_queue.qsize())
331 while not self.command_queue.empty():
332 (meth, param) = self.command_queue.get()
335 def create_slice(self, slice):
336 self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice]) ))
337 self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES]) ))
339 def add_resource(self, slice, resource):
340 self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
342 def delete_slice(self, slice):
343 slice_prefix = "/".join([self.DOMAIN,slice])
344 resource_prefix = "/".join([self.DOMAIN,slice,self.RESOURCES])
345 def delete_slice_resources(iq):
346 for i in iq.query.elements():
348 if node.startswith(resource_prefix):
349 self.command_queue.put((self.delete_node, node))
351 self.add_result_hook("discover", delete_slice_resources)
354 self.command_queue.put(( self.delete_node, resource_prefix) )
355 self.command_queue.put(( self.delete_node, slice_prefix) )
357 def delete_resource(self, slice, resource):
358 self.command_queue.put(( self.delete_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
362 if __name__ == "__main__":
364 config = Config("/etc/planetlab/plc_config")
366 xmppserver = config.PLC_OMF_XMPP_SERVER
367 xmppuser = "@".join([config.PLC_OMF_XMPP_USER, xmppserver])
368 xmpppass = config.PLC_OMF_XMPP_PASSWORD
369 slicemgr = Slicemgr(xmppuser, xmpppass,
370 log=open("/var/log/omf/pubsub_client.log", "a"),
373 t = task.LoopingCall(slicemgr.flush_commands)
374 t.start(5.0) # check every 5 seconds
376 reactor.callLater(1, slicemgr.create_node, "/OMF")
377 reactor.callLater(1, slicemgr.create_node, "/OMF/SYSTEM")
379 reactor.connectTCP(slicemgr.id.host, 5222, slicemgr.factory)
380 reactor.listenTCP(5053, server.Site(slicemgr), interface="localhost")
381 reactor.run(installSignalHandlers=True)