24e5aeef8b7b31c2693a1e5c7d3a69810200fd70
[plcapi.git] / omf / omf_slicemgr.py
1 #!/usr/bin/python
2 # Baris Metin <tmetin@sophia.inria.fr>
3
4 import os
5 import sys
6 import time
7 import Queue
8 from twisted.words.xish import domish
9 from twisted.web import xmlrpc, server
10 from twisted.internet import reactor, task
11 from twisted.words.protocols.jabber import xmlstream, client, jid
12
13 sys.path.append("/usr/share/plc_api/")
14 from PLC.Config import Config
15
16
17 class BaseClient(object):
18     """ Base XMPP client: handles authentication and basic presence/message requests. """
19     def __init__(self, id, secret, verbose = False, log = None):
20
21         if isinstance(id, (str, unicode)):
22             id = jid.JID(id)
23         x = client.XMPPClientFactory(id, secret)
24         x.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.event_connected)
25         x.addBootstrap(xmlstream.STREAM_END_EVENT, self.event_disconnected)
26         x.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.event_init_failed)
27         x.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.event_authenticated)
28         self.id = id
29         self.factory = x
30         self.verbose = verbose
31         self.log = log or sys.stdout
32
33     def __rawDataIN(self, buf):
34         if self.verbose: self.msg("RECV: %s" % buf)
35
36     def __rawDataOUT(self, buf):
37         if self.verbose: self.msg("SEND: %s" % buf)
38
39     def msg(self, msg):
40         self.log.write("%s\n" % msg)
41         self.log.flush()
42
43     def error(self, msg):
44         self.msg("ERROR: %s" % msg)
45
46     def warn(self, msg):
47         self.msg("WARN: %s" % msg)
48
49     def info(self, msg):
50         self.msg("INFO: %s" % msg)
51
52     def event_connected(self, xs):
53         # log all traffic
54         xs.rawDataInFn = self.__rawDataIN
55         xs.rawDataOutFn = self.__rawDataOUT
56         self.xmlstream = xs
57         
58     def event_disconnected(self, xs):
59         pass
60
61     def event_init_failed(self, xs):
62         self.error("Init Failed")
63
64     def event_authenticated(self, xs):
65         presence = domish.Element(("jabber:client", "presence"))
66         presence.addElement("show", content="dnd")
67         presence.addElement("status", content="man at work")
68         xs.send(presence)
69
70         # add protocol handlers
71         xs.addObserver("/presence[@type='subscribe']", self.presence_subscribe)
72         xs.addObserver("/presence[@type='unsubscribe']", self.presence_unsubscribe)
73         xs.addObserver("/presence", self.presence)
74         xs.addObserver("/message[@type='chat']", self.message_chat)
75
76     def presence_subscribe(self, m):
77         self.info("%s request to add us, granting." % m['from'])
78         p = domish.Element(("jabber:client", "presence"))
79         p['from'], p['to'] = m['to'], m['from']
80         p['type'] = "subscribed"
81         self.xmlstream.send(p)
82
83     def presence_unsubscribe(self, m):
84         # try to re-subscribe
85         self.info("%s removed us, trying to re-authenticate." % m['from'])
86         p = domish.Element(("jabber:client", "presence"))
87         p['from'], p['to'] = m['to'], m['from']
88         p['type'] = "subscribe"
89         self.xmlstream.send(p)
90
91     def presence(self, m):
92         p = domish.Element(("jabber:client", "presence"))
93         p['from'], p['to'] = m['to'], m['from']
94         # initially read presence.addElement, my wild guess.. -- Thierry
95         p.addElement("show", content="dnd")
96         p.addElement("status", content="man at work")
97         self.xmlstream.send(p)
98
99     def message_chat(self, m):
100         n = domish.Element((None, "message"))
101         n['to'] = m['from']
102         n['from'] = self.id.full()
103         n.addElement("body", content = "don't have time to chat. working!")
104         self.xmlstream.send(n)
105
106     
107 class PubSubClient(BaseClient):
108     """ PubSub (XEP 0060) implementation """
109
110     def __init__(self, id, secret, verbose = False, log = None):
111         BaseClient.__init__(self, id, secret, verbose = verbose, log = log)
112         self.hooks = {}
113     
114     def add_result_hook(self, hook_to, hook):
115         self.hooks[hook_to] = hook
116
117     def delete_result_hook(self, hook_to):
118         if self.hooks.has_key(hook_to):
119             del self.hooks[hook_to]
120
121     def event_authenticated(self, xs):
122         BaseClient.event_authenticated(self, xs)
123         self.requests = {}
124         xs.addObserver("/iq/pubsub/create", self.result_create_node)
125         xs.addObserver("/iq/pubsub/delete", self.result_delete_node)
126         xs.addObserver("/iq/query[@xmlns='http://jabber.org/protocol/disco#items']", self.result_discover)
127         xs.addObserver("/iq/pubsub/subscription[@subscription='subscribed']", self.result_subscribe_to_node)
128         xs.addObserver("/iq/pubsub/configure/x", self.result_configure_node)
129         xs.addObserver("/iq/pubsub/configure/error", self.result_configure_node)
130
131     def __iq(self, t="get"):
132         iq = domish.Element((None, "iq"))
133         iq['from'] = self.id.full()
134         iq['to'] = "pubsub.%s" % self.id.host
135         iq['type'] = t
136         iq.addUniqueId()
137         return iq
138
139     def __add_pubsub(self, iq):
140         pubsub = iq.addElement("pubsub")
141         pubsub['xmlns'] = "http://jabber.org/protocol/pubsub"
142         return pubsub
143
144
145     def discover(self, node = None):
146         iq = self.__iq("get")
147         query = iq.addElement("query")
148         query['xmlns'] = "http://jabber.org/protocol/disco#items"
149         if node:
150             query['node'] = node
151         self.requests[iq['id']] = node
152         self.xmlstream.send(iq)
153
154     def result_discover(self, iq):
155         hook = self.hooks.get('discover', None)
156         if hook:
157             hook(iq)
158             self.delete_result_hook('discover')
159
160         self.requests.pop(iq['id'])
161
162
163     def subscribe_to_node(self, node):
164         iq = self.__iq("set")
165         pubsub = self.__add_pubsub(iq)
166         subscribe = pubsub.addElement("subscribe")
167         subscribe['node'] = node
168         subscribe['jid'] = self.id.full()
169         self.requests[iq['id']] = node
170         self.xmlstream.send(iq)
171
172     def result_subscribe_to_node(self, iq):
173         self.requests.pop(iq['id'])
174
175
176     def publish_to_node(self, node, payload):
177         iq = self.__iq("set")
178         pubsub = self.__add_pubsub(iq)
179         publish = pubsub.addElement("publish")
180         publish['node'] = node
181         items = publish.addElement("item", content=payload)
182         self.requests[iq['id']] = node
183         self.xmlstream.send(iq)
184
185     def result_publish_to_node(self, iq):
186         self.requests.pop(iq['id'])
187
188
189     # TODO: ejabberd doesn't have the node configuration feature implmented yet!
190     def configure_node(self, node, fields=None):
191         iq = self.__iq("set")
192         pubsub = self.__add_pubsub(iq)
193         configure = pubsub.addElement("configure")
194         configure['node'] = node
195         
196         # TODO: add fields
197
198         self.requests[iq['id']] = node
199         self.xmlstream.send(iq)
200         
201     def result_configure_node(self, iq):
202         hook = self.hooks.get('configure', None)
203         if hook:
204             hook(iq)
205             self.delete_result_hook('configure')
206
207         self.requests.pop(iq['id'])
208
209
210     def create_node(self, node = None):
211         iq = self.__iq("set")
212         pubsub = self.__add_pubsub(iq)
213         create = pubsub.addElement("create")
214         if node:
215             create['node'] = node
216         configure = pubsub.addElement("configure")
217         self.requests[iq['id']] = node
218         self.xmlstream.send(iq)
219
220     def result_create_node(self, iq):
221         node = self.requests[iq['id']]
222         if iq.error:
223             if iq.error.conflict:
224                 # node is already there, nothing important.
225                 self.warn("NodeID exists: %s" % node)
226             else:
227                 err_type = ""
228                 if iq.error['type']:
229                     err_type = iq.error['type']
230                 self.error("Can not create node: %s (error type: %s)" %  (node, err_type))
231         else:
232             # no errors
233             # try subscribing to the node for debugging purposes
234             self.subscribe_to_node(node)
235
236         self.requests.pop(iq['id'])
237
238
239     def delete_node(self, node):
240         iq = self.__iq("set")
241         pubsub = self.__add_pubsub(iq)
242         delete = pubsub.addElement("delete")
243         delete['node'] = node
244         self.requests[iq['id']] = node
245         self.xmlstream.send(iq)
246
247     def result_delete_node(self, iq):
248         self.requests.pop(iq['id'])
249
250     def message_chat(self, m):
251         body = ""
252         for e in m.elements():
253             if e.name == "body":
254                 body = "%s" % e
255                 break
256
257 #         try:
258 #             node = m.event.items['node']
259 #             n = domish.Element((None, "message"))
260 #             n.addElement("body", content = "published to: %s\n%s" % (node, m.event.items.toXml()))
261 #             # for each listener in promiscuous mode send the published event
262 #             self.xmlstream.send(n)
263 #             return
264 #         except:
265 #             # not a pubsub message continue on
266 #             pass
267
268         if body == "list groups":
269             def list_groups(iq):
270                 reply = ""
271                 for i in iq.query.elements():
272                     reply += "%s\n" % i['node']
273                 n = domish.Element((None, "message"))
274                 n['to'] = m['from']
275                 n['from'] = self.id.full()
276                 n.addElement("body", content = reply)
277                 self.xmlstream.send(n)
278
279             self.add_result_hook("discover", list_groups)
280             self.discover()
281
282         elif body.startswith("configuration"):
283             # "configuration NODE"
284             node = ""
285             try:
286                 node = body.split()[1].strip()
287             except IndexError:
288                 pass
289
290             def get_configuration(iq):
291                 reply = iq.toXml()
292                 n = domish.Element((None, "message"))
293                 n['to'] = m['from']
294                 n['from'] = self.id.full()
295                 n.addElement("body", content = reply)
296                 self.xmlstream.send(n)
297
298             self.add_result_hook("configure", get_configuration)
299             self.configure_node(node)
300
301         else:
302             BaseClient.message_chat(self, m)
303
304
305 class Slicemgr(xmlrpc.XMLRPC, PubSubClient):
306     
307     DOMAIN = "/OMF"
308     RESOURCES = 'resources'
309
310     def __init__(self, id, secret, verbose = False, log = None):
311         xmlrpc.XMLRPC.__init__(self, allowNone=True)
312         PubSubClient.__init__(self, id, secret, verbose = verbose, log = log)
313         self.command_queue = Queue.Queue()
314
315         xmlrpc.addIntrospection(self)
316
317     def xmlrpc_createSlice(self, slice):
318         self.create_slice(slice)
319
320     def xmlrpc_addResource(self, slice, resource):
321         self.add_resource(slice, resource)
322
323     def xmlrpc_deleteSlice(self, slice):
324         self.delete_slice(slice)
325
326     def xmlrpc_removeResource(self, slice, resource):
327         self.delete_resource(slice, resource)
328
329
330     def flush_commands(self):
331 #        self.info("Running %d commands" % self.command_queue.qsize())
332         while not self.command_queue.empty():
333             (meth, param) = self.command_queue.get()
334             meth(param)
335
336     def create_slice(self, slice):
337         self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice]) ))
338         self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES]) ))
339
340     def add_resource(self, slice, resource):
341         resname = "/".join([self.DOMAIN,slice,self.RESOURCES,resource])
342         self.command_queue.put(( self.create_node, resname ))
343
344     def delete_slice(self, slice):
345         slice_prefix = "/".join([self.DOMAIN,slice])
346         resource_prefix = "/".join([self.DOMAIN,slice,self.RESOURCES])
347         def delete_slice_resources(iq):
348             for i in iq.query.elements():
349                 node = i['node']
350                 if node.startswith(resource_prefix):
351                     self.command_queue.put((self.delete_node, node))
352
353         self.add_result_hook("discover", delete_slice_resources)
354         self.discover()
355
356         self.command_queue.put(( self.delete_node, resource_prefix) )
357         self.command_queue.put(( self.delete_node, slice_prefix) )
358
359     def delete_resource(self, slice, resource):
360         self.command_queue.put(( self.delete_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
361         
362
363
364 if __name__ == "__main__":
365
366     config = Config("/etc/planetlab/plc_config")
367
368     xmppserver = config.PLC_OMF_XMPP_SERVER
369     xmppuser = "@".join([config.PLC_OMF_XMPP_USER, xmppserver])
370     xmpppass = config.PLC_OMF_XMPP_PASSWORD
371     monthstring=time.strftime("%Y-%m")
372     slicemgr = Slicemgr(xmppuser, xmpppass,
373                         log=open("/var/log/omf/pubsub-client-%s.log"%monthstring, "a"),
374                         # used to be verbose=True but that amounts to huge totally helpless logs, so..
375                         verbose=False)
376
377     t = task.LoopingCall(slicemgr.flush_commands)
378     t.start(5.0) # check every 5 seconds
379
380     reactor.callLater(1, slicemgr.create_node, "/OMF")
381     reactor.callLater(1, slicemgr.create_node, "/OMF/SYSTEM")
382
383     reactor.connectTCP(slicemgr.id.host, 5222, slicemgr.factory)
384     reactor.listenTCP(5053, server.Site(slicemgr), interface="localhost")
385     reactor.run(installSignalHandlers=True)
386
387
388