2 # Baris Metin <tmetin@sophia.inria.fr>
8 from twisted.words.xish import domish
9 from twisted.web import xmlrpc, server
10 from twisted.internet import reactor, task
11 from twisted.words.protocols.jabber import xmlstream, client, jid
13 sys.path.append("/usr/share/plc_api/")
14 from PLC.Config import Config
17 class BaseClient(object):
18 """ Base XMPP client: handles authentication and basic presence/message requests. """
19 def __init__(self, id, secret, verbose = False, log = None):
21 if isinstance(id, (str, unicode)):
23 x = client.XMPPClientFactory(id, secret)
24 x.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.event_connected)
25 x.addBootstrap(xmlstream.STREAM_END_EVENT, self.event_disconnected)
26 x.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.event_init_failed)
27 x.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.event_authenticated)
30 self.verbose = verbose
31 self.log = log or sys.stdout
33 def __rawDataIN(self, buf):
34 if self.verbose: self.msg("RECV: %s" % buf)
36 def __rawDataOUT(self, buf):
37 if self.verbose: self.msg("SEND: %s" % buf)
40 self.log.write("%s\n" % msg)
44 self.msg("ERROR: %s" % msg)
47 self.msg("WARN: %s" % msg)
50 self.msg("INFO: %s" % msg)
52 def event_connected(self, xs):
54 xs.rawDataInFn = self.__rawDataIN
55 xs.rawDataOutFn = self.__rawDataOUT
58 def event_disconnected(self, xs):
61 def event_init_failed(self, xs):
62 self.error("Init Failed")
64 def event_authenticated(self, xs):
65 presence = domish.Element(("jabber:client", "presence"))
66 presence.addElement("show", content="dnd")
67 presence.addElement("status", content="man at work")
70 # add protocol handlers
71 xs.addObserver("/presence[@type='subscribe']", self.presence_subscribe)
72 xs.addObserver("/presence[@type='unsubscribe']", self.presence_unsubscribe)
73 xs.addObserver("/presence", self.presence)
74 xs.addObserver("/message[@type='chat']", self.message_chat)
76 def presence_subscribe(self, m):
77 self.info("%s request to add us, granting." % m['from'])
78 p = domish.Element(("jabber:client", "presence"))
79 p['from'], p['to'] = m['to'], m['from']
80 p['type'] = "subscribed"
81 self.xmlstream.send(p)
83 def presence_unsubscribe(self, m):
85 self.info("%s removed us, trying to re-authenticate." % m['from'])
86 p = domish.Element(("jabber:client", "presence"))
87 p['from'], p['to'] = m['to'], m['from']
88 p['type'] = "subscribe"
89 self.xmlstream.send(p)
91 def presence(self, m):
92 p = domish.Element(("jabber:client", "presence"))
93 p['from'], p['to'] = m['to'], m['from']
94 # initially read presence.addElement, my wild guess.. -- Thierry
95 p.addElement("show", content="dnd")
96 p.addElement("status", content="man at work")
97 self.xmlstream.send(p)
99 def message_chat(self, m):
100 n = domish.Element((None, "message"))
102 n['from'] = self.id.full()
103 n.addElement("body", content = "don't have time to chat. working!")
104 self.xmlstream.send(n)
107 class PubSubClient(BaseClient):
108 """ PubSub (XEP 0060) implementation """
110 def __init__(self, id, secret, verbose = False, log = None):
111 BaseClient.__init__(self, id, secret, verbose = verbose, log = log)
114 def add_result_hook(self, hook_to, hook):
115 self.hooks[hook_to] = hook
117 def delete_result_hook(self, hook_to):
118 if self.hooks.has_key(hook_to):
119 del self.hooks[hook_to]
121 def event_authenticated(self, xs):
122 BaseClient.event_authenticated(self, xs)
124 xs.addObserver("/iq/pubsub/create", self.result_create_node)
125 xs.addObserver("/iq/pubsub/delete", self.result_delete_node)
126 xs.addObserver("/iq/query[@xmlns='http://jabber.org/protocol/disco#items']", self.result_discover)
127 xs.addObserver("/iq/pubsub/subscription[@subscription='subscribed']", self.result_subscribe_to_node)
128 xs.addObserver("/iq/pubsub/configure/x", self.result_configure_node)
129 xs.addObserver("/iq/pubsub/configure/error", self.result_configure_node)
131 def __iq(self, t="get"):
132 iq = domish.Element((None, "iq"))
133 iq['from'] = self.id.full()
134 iq['to'] = "pubsub.%s" % self.id.host
139 def __add_pubsub(self, iq):
140 pubsub = iq.addElement("pubsub")
141 pubsub['xmlns'] = "http://jabber.org/protocol/pubsub"
145 def discover(self, node = None):
146 iq = self.__iq("get")
147 query = iq.addElement("query")
148 query['xmlns'] = "http://jabber.org/protocol/disco#items"
151 self.requests[iq['id']] = node
152 self.xmlstream.send(iq)
154 def result_discover(self, iq):
155 hook = self.hooks.get('discover', None)
158 self.delete_result_hook('discover')
160 self.requests.pop(iq['id'])
163 def subscribe_to_node(self, node):
164 iq = self.__iq("set")
165 pubsub = self.__add_pubsub(iq)
166 subscribe = pubsub.addElement("subscribe")
167 subscribe['node'] = node
168 subscribe['jid'] = self.id.full()
169 self.requests[iq['id']] = node
170 self.xmlstream.send(iq)
172 def result_subscribe_to_node(self, iq):
173 self.requests.pop(iq['id'])
176 def publish_to_node(self, node, payload):
177 iq = self.__iq("set")
178 pubsub = self.__add_pubsub(iq)
179 publish = pubsub.addElement("publish")
180 publish['node'] = node
181 items = publish.addElement("item", content=payload)
182 self.requests[iq['id']] = node
183 self.xmlstream.send(iq)
185 def result_publish_to_node(self, iq):
186 self.requests.pop(iq['id'])
189 # TODO: ejabberd doesn't have the node configuration feature implmented yet!
190 def configure_node(self, node, fields=None):
191 iq = self.__iq("set")
192 pubsub = self.__add_pubsub(iq)
193 configure = pubsub.addElement("configure")
194 configure['node'] = node
198 self.requests[iq['id']] = node
199 self.xmlstream.send(iq)
201 def result_configure_node(self, iq):
202 hook = self.hooks.get('configure', None)
205 self.delete_result_hook('configure')
207 self.requests.pop(iq['id'])
210 def create_node(self, node = None):
211 iq = self.__iq("set")
212 pubsub = self.__add_pubsub(iq)
213 create = pubsub.addElement("create")
215 create['node'] = node
216 configure = pubsub.addElement("configure")
217 self.requests[iq['id']] = node
218 self.xmlstream.send(iq)
220 def result_create_node(self, iq):
221 node = self.requests[iq['id']]
223 if iq.error.conflict:
224 # node is already there, nothing important.
225 self.warn("NodeID exists: %s" % node)
229 err_type = iq.error['type']
230 self.error("Can not create node: %s (error type: %s)" % (node, err_type))
233 # try subscribing to the node for debugging purposes
234 self.subscribe_to_node(node)
236 self.requests.pop(iq['id'])
239 def delete_node(self, node):
240 iq = self.__iq("set")
241 pubsub = self.__add_pubsub(iq)
242 delete = pubsub.addElement("delete")
243 delete['node'] = node
244 self.requests[iq['id']] = node
245 self.xmlstream.send(iq)
247 def result_delete_node(self, iq):
248 self.requests.pop(iq['id'])
250 def message_chat(self, m):
252 for e in m.elements():
258 # node = m.event.items['node']
259 # n = domish.Element((None, "message"))
260 # n.addElement("body", content = "published to: %s\n%s" % (node, m.event.items.toXml()))
261 # # for each listener in promiscuous mode send the published event
262 # self.xmlstream.send(n)
265 # # not a pubsub message continue on
268 if body == "list groups":
271 for i in iq.query.elements():
272 reply += "%s\n" % i['node']
273 n = domish.Element((None, "message"))
275 n['from'] = self.id.full()
276 n.addElement("body", content = reply)
277 self.xmlstream.send(n)
279 self.add_result_hook("discover", list_groups)
282 elif body.startswith("configuration"):
283 # "configuration NODE"
286 node = body.split()[1].strip()
290 def get_configuration(iq):
292 n = domish.Element((None, "message"))
294 n['from'] = self.id.full()
295 n.addElement("body", content = reply)
296 self.xmlstream.send(n)
298 self.add_result_hook("configure", get_configuration)
299 self.configure_node(node)
302 BaseClient.message_chat(self, m)
305 class Slicemgr(xmlrpc.XMLRPC, PubSubClient):
308 RESOURCES = 'resources'
310 def __init__(self, id, secret, verbose = False, log = None):
311 xmlrpc.XMLRPC.__init__(self, allowNone=True)
312 PubSubClient.__init__(self, id, secret, verbose = verbose, log = log)
313 self.command_queue = Queue.Queue()
315 xmlrpc.addIntrospection(self)
317 def xmlrpc_createSlice(self, slice):
318 self.create_slice(slice)
320 def xmlrpc_addResource(self, slice, resource):
321 self.add_resource(slice, resource)
323 def xmlrpc_deleteSlice(self, slice):
324 self.delete_slice(slice)
326 def xmlrpc_removeResource(self, slice, resource):
327 self.delete_resource(slice, resource)
330 def flush_commands(self):
331 # self.info("Running %d commands" % self.command_queue.qsize())
332 while not self.command_queue.empty():
333 (meth, param) = self.command_queue.get()
336 def create_slice(self, slice):
337 self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice]) ))
338 self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES]) ))
340 def add_resource(self, slice, resource):
341 resname = "/".join([self.DOMAIN,slice,self.RESOURCES,resource])
342 self.command_queue.put(( self.create_node, resname ))
344 def delete_slice(self, slice):
345 slice_prefix = "/".join([self.DOMAIN,slice])
346 resource_prefix = "/".join([self.DOMAIN,slice,self.RESOURCES])
347 def delete_slice_resources(iq):
348 for i in iq.query.elements():
350 if node.startswith(resource_prefix):
351 self.command_queue.put((self.delete_node, node))
353 self.add_result_hook("discover", delete_slice_resources)
356 self.command_queue.put(( self.delete_node, resource_prefix) )
357 self.command_queue.put(( self.delete_node, slice_prefix) )
359 def delete_resource(self, slice, resource):
360 self.command_queue.put(( self.delete_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
364 if __name__ == "__main__":
366 config = Config("/etc/planetlab/plc_config")
368 xmppserver = config.PLC_OMF_XMPP_SERVER
369 xmppuser = "@".join([config.PLC_OMF_XMPP_USER, xmppserver])
370 xmpppass = config.PLC_OMF_XMPP_PASSWORD
371 monthstring=time.strftime("%Y-%m")
372 slicemgr = Slicemgr(xmppuser, xmpppass,
373 log=open("/var/log/omf/pubsub-client-%s.log"%monthstring, "a"),
374 # used to be verbose=True but that amounts to huge totally helpless logs, so..
377 t = task.LoopingCall(slicemgr.flush_commands)
378 t.start(5.0) # check every 5 seconds
380 reactor.callLater(1, slicemgr.create_node, "/OMF")
381 reactor.callLater(1, slicemgr.create_node, "/OMF/SYSTEM")
383 reactor.connectTCP(slicemgr.id.host, 5222, slicemgr.factory)
384 reactor.listenTCP(5053, server.Site(slicemgr), interface="localhost")
385 reactor.run(installSignalHandlers=True)