#!/usr/bin/python # Baris Metin import os import sys import time import Queue from twisted.words.xish import domish from twisted.web import xmlrpc, server from twisted.internet import reactor, task from twisted.words.protocols.jabber import xmlstream, client, jid sys.path.append("/usr/share/plc_api/") from PLC.Config import Config class BaseClient(object): """ Base XMPP client: handles authentication and basic presence/message requests. """ def __init__(self, id, secret, verbose = False, log = None): if isinstance(id, (str, unicode)): id = jid.JID(id) x = client.XMPPClientFactory(id, secret) x.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.event_connected) x.addBootstrap(xmlstream.STREAM_END_EVENT, self.event_disconnected) x.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.event_init_failed) x.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.event_authenticated) self.id = id self.factory = x self.verbose = verbose self.log = log or sys.stdout def __rawDataIN(self, buf): if self.verbose: self.msg("RECV: %s" % buf) def __rawDataOUT(self, buf): if self.verbose: self.msg("SEND: %s" % buf) def msg(self, msg): self.log.write("%s\n" % msg) self.log.flush() def error(self, msg): self.msg("ERROR: %s" % msg) def warn(self, msg): self.msg("WARN: %s" % msg) def info(self, msg): self.msg("INFO: %s" % msg) def event_connected(self, xs): # log all traffic xs.rawDataInFn = self.__rawDataIN xs.rawDataOutFn = self.__rawDataOUT self.xmlstream = xs def event_disconnected(self, xs): pass def event_init_failed(self, xs): self.error("Init Failed") def event_authenticated(self, xs): presence = domish.Element(("jabber:client", "presence")) presence.addElement("show", content="dnd") presence.addElement("status", content="man at work") xs.send(presence) # add protocol handlers xs.addObserver("/presence[@type='subscribe']", self.presence_subscribe) xs.addObserver("/presence[@type='unsubscribe']", self.presence_unsubscribe) xs.addObserver("/presence", self.presence) xs.addObserver("/message[@type='chat']", self.message_chat) def presence_subscribe(self, m): self.info("%s request to add us, granting." % m['from']) p = domish.Element(("jabber:client", "presence")) p['from'], p['to'] = m['to'], m['from'] p['type'] = "subscribed" self.xmlstream.send(p) def presence_unsubscribe(self, m): # try to re-subscribe self.info("%s removed us, trying to re-authenticate." % m['from']) p = domish.Element(("jabber:client", "presence")) p['from'], p['to'] = m['to'], m['from'] p['type'] = "subscribe" self.xmlstream.send(p) def presence(self, m): p = domish.Element(("jabber:client", "presence")) p['from'], p['to'] = m['to'], m['from'] # initially read presence.addElement, my wild guess.. -- Thierry p.addElement("show", content="dnd") p.addElement("status", content="man at work") self.xmlstream.send(p) def message_chat(self, m): n = domish.Element((None, "message")) n['to'] = m['from'] n['from'] = self.id.full() n.addElement("body", content = "don't have time to chat. working!") self.xmlstream.send(n) class PubSubClient(BaseClient): """ PubSub (XEP 0060) implementation """ def __init__(self, id, secret, verbose = False, log = None): BaseClient.__init__(self, id, secret, verbose = verbose, log = log) self.hooks = {} def add_result_hook(self, hook_to, hook): self.hooks[hook_to] = hook def delete_result_hook(self, hook_to): if self.hooks.has_key(hook_to): del self.hooks[hook_to] def event_authenticated(self, xs): BaseClient.event_authenticated(self, xs) self.requests = {} xs.addObserver("/iq/pubsub/create", self.result_create_node) xs.addObserver("/iq/pubsub/delete", self.result_delete_node) xs.addObserver("/iq/query[@xmlns='http://jabber.org/protocol/disco#items']", self.result_discover) xs.addObserver("/iq/pubsub/subscription[@subscription='subscribed']", self.result_subscribe_to_node) xs.addObserver("/iq/pubsub/configure/x", self.result_configure_node) xs.addObserver("/iq/pubsub/configure/error", self.result_configure_node) def __iq(self, t="get"): iq = domish.Element((None, "iq")) iq['from'] = self.id.full() iq['to'] = "pubsub.%s" % self.id.host iq['type'] = t iq.addUniqueId() return iq def __add_pubsub(self, iq): pubsub = iq.addElement("pubsub") pubsub['xmlns'] = "http://jabber.org/protocol/pubsub" return pubsub def discover(self, node = None): iq = self.__iq("get") query = iq.addElement("query") query['xmlns'] = "http://jabber.org/protocol/disco#items" if node: query['node'] = node self.requests[iq['id']] = node self.xmlstream.send(iq) def result_discover(self, iq): hook = self.hooks.get('discover', None) if hook: hook(iq) self.delete_result_hook('discover') self.requests.pop(iq['id']) def subscribe_to_node(self, node): iq = self.__iq("set") pubsub = self.__add_pubsub(iq) subscribe = pubsub.addElement("subscribe") subscribe['node'] = node subscribe['jid'] = self.id.full() self.requests[iq['id']] = node self.xmlstream.send(iq) def result_subscribe_to_node(self, iq): self.requests.pop(iq['id']) def publish_to_node(self, node, payload): iq = self.__iq("set") pubsub = self.__add_pubsub(iq) publish = pubsub.addElement("publish") publish['node'] = node items = publish.addElement("item", content=payload) self.requests[iq['id']] = node self.xmlstream.send(iq) def result_publish_to_node(self, iq): self.requests.pop(iq['id']) # TODO: ejabberd doesn't have the node configuration feature implmented yet! def configure_node(self, node, fields=None): iq = self.__iq("set") pubsub = self.__add_pubsub(iq) configure = pubsub.addElement("configure") configure['node'] = node # TODO: add fields self.requests[iq['id']] = node self.xmlstream.send(iq) def result_configure_node(self, iq): hook = self.hooks.get('configure', None) if hook: hook(iq) self.delete_result_hook('configure') self.requests.pop(iq['id']) def create_node(self, node = None): iq = self.__iq("set") pubsub = self.__add_pubsub(iq) create = pubsub.addElement("create") if node: create['node'] = node configure = pubsub.addElement("configure") self.requests[iq['id']] = node self.xmlstream.send(iq) def result_create_node(self, iq): node = self.requests[iq['id']] if iq.error: if iq.error.conflict: # node is already there, nothing important. self.warn("NodeID exists: %s" % node) else: err_type = "" if iq.error['type']: err_type = iq.error['type'] self.error("Can not create node: %s (error type: %s)" % (node, err_type)) else: # no errors # try subscribing to the node for debugging purposes self.subscribe_to_node(node) self.requests.pop(iq['id']) def delete_node(self, node): iq = self.__iq("set") pubsub = self.__add_pubsub(iq) delete = pubsub.addElement("delete") delete['node'] = node self.requests[iq['id']] = node self.xmlstream.send(iq) def result_delete_node(self, iq): self.requests.pop(iq['id']) def message_chat(self, m): body = "" for e in m.elements(): if e.name == "body": body = "%s" % e break # try: # node = m.event.items['node'] # n = domish.Element((None, "message")) # n.addElement("body", content = "published to: %s\n%s" % (node, m.event.items.toXml())) # # for each listener in promiscuous mode send the published event # self.xmlstream.send(n) # return # except: # # not a pubsub message continue on # pass if body == "list groups": def list_groups(iq): reply = "" for i in iq.query.elements(): reply += "%s\n" % i['node'] n = domish.Element((None, "message")) n['to'] = m['from'] n['from'] = self.id.full() n.addElement("body", content = reply) self.xmlstream.send(n) self.add_result_hook("discover", list_groups) self.discover() elif body.startswith("configuration"): # "configuration NODE" node = "" try: node = body.split()[1].strip() except IndexError: pass def get_configuration(iq): reply = iq.toXml() n = domish.Element((None, "message")) n['to'] = m['from'] n['from'] = self.id.full() n.addElement("body", content = reply) self.xmlstream.send(n) self.add_result_hook("configure", get_configuration) self.configure_node(node) else: BaseClient.message_chat(self, m) class Slicemgr(xmlrpc.XMLRPC, PubSubClient): DOMAIN = "/OMF" RESOURCES = 'resources' def __init__(self, id, secret, verbose = False, log = None): xmlrpc.XMLRPC.__init__(self, allowNone=True) PubSubClient.__init__(self, id, secret, verbose = verbose, log = log) self.command_queue = Queue.Queue() xmlrpc.addIntrospection(self) def xmlrpc_createSlice(self, slice): self.create_slice(slice) def xmlrpc_addResource(self, slice, resource): self.add_resource(slice, resource) def xmlrpc_deleteSlice(self, slice): self.delete_slice(slice) def xmlrpc_removeResource(self, slice, resource): self.delete_resource(slice, resource) def flush_commands(self): # self.info("Running %d commands" % self.command_queue.qsize()) while not self.command_queue.empty(): (meth, param) = self.command_queue.get() meth(param) def create_slice(self, slice): self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice]) )) self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES]) )) def add_resource(self, slice, resource): resname = "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) self.command_queue.put(( self.create_node, resname )) def delete_slice(self, slice): slice_prefix = "/".join([self.DOMAIN,slice]) resource_prefix = "/".join([self.DOMAIN,slice,self.RESOURCES]) def delete_slice_resources(iq): for i in iq.query.elements(): node = i['node'] if node.startswith(resource_prefix): self.command_queue.put((self.delete_node, node)) self.add_result_hook("discover", delete_slice_resources) self.discover() self.command_queue.put(( self.delete_node, resource_prefix) ) self.command_queue.put(( self.delete_node, slice_prefix) ) def delete_resource(self, slice, resource): self.command_queue.put(( self.delete_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) )) if __name__ == "__main__": config = Config("/etc/planetlab/plc_config") xmppserver = config.PLC_OMF_XMPP_SERVER xmppuser = "@".join([config.PLC_OMF_XMPP_USER, xmppserver]) xmpppass = config.PLC_OMF_XMPP_PASSWORD monthstring=time.strftime("%Y-%m") slicemgr = Slicemgr(xmppuser, xmpppass, log=open("/var/log/omf/pubsub-client-%s.log"%monthstring, "a"), # used to be verbose=True but that amounts to huge totally helpless logs, so.. verbose=False) t = task.LoopingCall(slicemgr.flush_commands) t.start(5.0) # check every 5 seconds reactor.callLater(1, slicemgr.create_node, "/OMF") reactor.callLater(1, slicemgr.create_node, "/OMF/SYSTEM") reactor.connectTCP(slicemgr.id.host, 5222, slicemgr.factory) reactor.listenTCP(5053, server.Site(slicemgr), interface="localhost") reactor.run(installSignalHandlers=True)