1 # Baris Metin <tmetin@sophia.inria.fr>
6 from twisted.words.xish import domish
7 from twisted.web import xmlrpc, server
8 from twisted.internet import reactor, task
9 from twisted.words.protocols.jabber import xmlstream, client, jid
11 sys.path.append("/usr/share/plc_api/")
12 from PLC.Config import Config
15 class BaseClient(object):
16 """ Base XMPP client: handles authentication and basic presence/message requests. """
17 def __init__(self, id, secret, verbose = False, log = None):
19 if isinstance(id, (str, unicode)):
21 x = client.XMPPClientFactory(id, secret)
22 x.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.event_connected)
23 x.addBootstrap(xmlstream.STREAM_END_EVENT, self.event_disconnected)
24 x.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.event_init_failed)
25 x.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.event_authenticated)
28 self.verbose = verbose
29 self.log = log or sys.stdout
31 def __rawDataIN(self, buf):
32 if self.verbose: self.msg("RECV: %s" % buf)
34 def __rawDataOUT(self, buf):
35 if self.verbose: self.msg("SEND: %s" % buf)
38 self.log.write("%s\n" % msg)
42 self.msg("ERROR: %s" % msg)
45 self.msg("WARN: %s" % msg)
48 self.msg("INFO: %s" % msg)
50 def event_connected(self, xs):
52 xs.rawDataInFn = self.__rawDataIN
53 xs.rawDataOutFn = self.__rawDataOUT
56 def event_disconnected(self, xs):
59 def event_init_failed(self, xs):
60 self.error("Init Failed")
62 def event_authenticated(self, xs):
63 presence = domish.Element(("jabber:client", "presence"))
64 presence.addElement("show", content="dnd")
65 presence.addElement("status", content="man at work")
68 # add protocol handlers
69 xs.addObserver("/presence[@type='subscribe']", self.presence_subscribe)
70 xs.addObserver("/presence[@type='unsubscribe']", self.presence_unsubscribe)
71 xs.addObserver("/precence", self.presence)
72 xs.addObserver("/message[@type='chat']", self.message_chat)
74 def presence_subscribe(self, m):
75 self.info("%s request to add us, granting." % m['from'])
76 p = domish.Element(("jabber:client", "presence"))
77 p['from'], p['to'] = m['to'], m['from']
78 p['type'] = "subscribed"
79 self.xmlstream.send(p)
81 def presence_unsubscribe(self, m):
83 self.info("%s removed us, trying to re-authenticate." % m['from'])
84 p = domish.Element(("jabber:client", "presence"))
85 p['from'], p['to'] = m['to'], m['from']
86 p['type'] = "subscribe"
87 self.xmlstream.send(p)
89 def presence(self, m):
90 p = domish.Element(("jabber:client", "presence"))
91 p['from'], p['to'] = m['to'], m['from']
92 presence.addElement("show", content="dnd")
93 presence.addElement("status", content="man at work")
94 self.xmlstream.send(p)
96 def message_chat(self, m):
97 n = domish.Element((None, "message"))
98 n['to'], n['from'] = m['from'], m['to']
99 n.addElement("body", content = "don't have time to chat. working!")
100 self.xmlstream.send(n)
103 class PubSubClient(BaseClient):
104 """ PubSub (XEP 0060) implementation """
106 def __init__(self, id, secret, verbose = False, log = None):
107 BaseClient.__init__(self, id, secret, verbose = verbose, log = log)
110 def add_result_hook(self, hook_to, hook):
111 self.hooks[hook_to] = hook
113 def delete_result_hook(self, hook_to):
114 if self.hooks.has_key(hook_to):
115 del self.hooks[hook_to]
117 def event_authenticated(self, xs):
118 BaseClient.event_authenticated(self, xs)
120 xs.addObserver("/iq/pubsub/create", self.result_create_node)
121 xs.addObserver("/iq/pubsub/delete", self.result_delete_node)
122 xs.addObserver("/iq/query[@xmlns='http://jabber.org/protocol/disco#items']", self.result_discover)
124 def __iq(self, t="get"):
125 iq = domish.Element((None, "iq"))
126 iq['from'] = self.id.full()
127 iq['to'] = "pubsub.vplc27.inria.fr"
132 def discover(self, node = None):
133 iq = self.__iq("get")
134 query = iq.addElement("query")
135 query['xmlns'] = "http://jabber.org/protocol/disco#items"
138 self.requests[iq['id']] = node
139 self.xmlstream.send(iq)
141 def result_discover(self, iq):
142 if self.verbose: self.info("Items for node: %s" % self.requests[iq['id']])
144 hook = self.hooks.get('discover', None)
145 for i in iq.query.elements():
146 if self.verbose: self.msg(i.toXml())
148 self.requests.pop(iq['id'])
150 def create_node(self, node = None):
151 iq = self.__iq("set")
152 pubsub = iq.addElement("pubsub")
153 pubsub['xmlns'] = "http://jabber.org/protocol/pubsub"
154 create = pubsub.addElement("create")
156 create['node'] = node
157 configure = pubsub.addElement("configure")
158 self.requests[iq['id']] = node
159 self.xmlstream.send(iq)
161 def result_create_node(self, iq):
162 # if hasattr(iq, "error"):
163 # node = self.requests[iq['id']]
164 # if hasattr(iq.error, "conflict"):
165 # # node is already there, nothing important.
166 # self.warn("NodeID exists: %s" % node)
171 # if iq.error.has_key('type'):
172 # err_type = iq.error['type']
173 # if iq.error.firstChildElement and hasattr(iq.error.firstChildElement, "name"):
174 # err_name = iq.error.firstChildElement.name
175 # self.error("Can not create node: %s (error type: %s, %s)" % (node, err_type, err_name))
176 self.requests.pop(iq['id'])
179 def delete_node(self, node):
180 iq = self.__iq("set")
181 pubsub = iq.addElement("pubsub")
182 pubsub['xmlns'] = "http://jabber.org/protocol/pubsub#owner"
183 delete = pubsub.addElement("delete")
184 delete['node'] = node
185 self.requests[iq['id']] = node
186 self.xmlstream.send(iq)
188 def result_delete_node(self, iq):
189 self.requests.pop(iq['id'])
194 class Slicemgr(PubSubClient, xmlrpc.XMLRPC):
197 RESOURCES = 'resources'
199 def __init__(self, id, secret, verbose = False, log = None):
200 PubSubClient.__init__(self, id, secret, verbose = verbose, log = log)
201 self.command_queue = Queue.Queue()
203 # for xmlrpc interface
204 self.allowNone = True
206 def xmlrpc_createSlice(self, slice):
207 self.create_slice(slice)
209 def xmlrpc_addResource(self, slice, resource):
210 self.add_resource(slice, resource)
212 def xmlrpc_deleteSlice(self, slice):
213 self.delete_slice(slice)
215 def xmlrpc_removeResource(self, slice, resource):
216 self.delete_resource(slice, resource)
219 def flush_commands(self):
220 # self.info("Running %d commands" % self.command_queue.qsize())
221 while not self.command_queue.empty():
222 (meth, param) = self.command_queue.get()
225 def create_slice(self, slice):
226 self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice]) ))
227 self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES]) ))
229 def add_resource(self, slice, resource):
230 self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
232 def delete_slice(self, slice):
233 slice_prefix = "/".join([self.DOMAIN,slice])
234 resource_prefix = "/".join([self.DOMAIN,slice,self.RESOURCES])
235 def delete_slice_resources(iq):
237 if node.startswith(resource_prefix):
238 self.command_que.put(self.delete_node, node)
240 self.add_result_hook("discover", delete_slice_resources)
242 self.delete_result_hook("discover")
244 self.command_queue.put(( self.delete_node, resource_prefix) )
245 self.command_queue.put(( self.delete_node, slice_prefix) )
247 def delete_resource(self, slice, resource):
248 self.command_queue.put(( self.delete_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
252 if __name__ == "__main__":
254 config = Config("/etc/planetlab/plc_config")
256 xmppserver = self.config.PLC_OMF_XMPP_SERVER
257 xmppuser = "@".join([self.config.PLC_OMF_XMPP_USER, xmppserver])
258 xmpppass = self.config.PLC_OMF_XMPP_PASSWORD
259 slicemgr = Slicemgr(xmppuser, xmpppass,
260 log=open("/var/log/omf/pubsub_client.log", "a"),
263 t = task.LoopingCall(slicemgr.flush_commands)
264 t.start(5.0) # check every 5 seconds
265 reactor.connectTCP(slicemgr.id.host, 5222, slicemgr.factory)
266 reactor.listenTCP(5053, server.Site(slicemgr))
267 reactor.run(installSignalHandlers=True)