added create_network(), delete_network(), create_subnet(), delete_subnet(), process_t...
[plcapi.git] / omf / omf_slicemgr.py
1 #!/usr/bin/python
2 # Baris Metin <tmetin@sophia.inria.fr>
3
4 import os
5 import sys
6 import Queue
7 from twisted.words.xish import domish
8 from twisted.web import xmlrpc, server
9 from twisted.internet import reactor, task
10 from twisted.words.protocols.jabber import xmlstream, client, jid
11
12 sys.path.append("/usr/share/plc_api/")
13 from PLC.Config import Config
14
15
16 class BaseClient(object):
17     """ Base XMPP client: handles authentication and basic presence/message requests. """
18     def __init__(self, id, secret, verbose = False, log = None):
19
20         if isinstance(id, (str, unicode)):
21             id = jid.JID(id)
22         x = client.XMPPClientFactory(id, secret)
23         x.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.event_connected)
24         x.addBootstrap(xmlstream.STREAM_END_EVENT, self.event_disconnected)
25         x.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.event_init_failed)
26         x.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.event_authenticated)
27         self.id = id
28         self.factory = x
29         self.verbose = verbose
30         self.log = log or sys.stdout
31
32     def __rawDataIN(self, buf):
33         if self.verbose: self.msg("RECV: %s" % buf)
34
35     def __rawDataOUT(self, buf):
36         if self.verbose: self.msg("SEND: %s" % buf)
37
38     def msg(self, msg):
39         self.log.write("%s\n" % msg)
40         self.log.flush()
41
42     def error(self, msg):
43         self.msg("ERROR: %s" % msg)
44
45     def warn(self, msg):
46         self.msg("WARN: %s" % msg)
47
48     def info(self, msg):
49         self.msg("INFO: %s" % msg)
50
51     def event_connected(self, xs):
52         # log all traffic
53         xs.rawDataInFn = self.__rawDataIN
54         xs.rawDataOutFn = self.__rawDataOUT
55         self.xmlstream = xs
56         
57     def event_disconnected(self, xs):
58         pass
59
60     def event_init_failed(self, xs):
61         self.error("Init Failed")
62
63     def event_authenticated(self, xs):
64         presence = domish.Element(("jabber:client", "presence"))
65         presence.addElement("show", content="dnd")
66         presence.addElement("status", content="man at work")
67         xs.send(presence)
68
69         # add protocol handlers
70         xs.addObserver("/presence[@type='subscribe']", self.presence_subscribe)
71         xs.addObserver("/presence[@type='unsubscribe']", self.presence_unsubscribe)
72         xs.addObserver("/presence", self.presence)
73         xs.addObserver("/message[@type='chat']", self.message_chat)
74
75     def presence_subscribe(self, m):
76         self.info("%s request to add us, granting." % m['from'])
77         p = domish.Element(("jabber:client", "presence"))
78         p['from'], p['to'] = m['to'], m['from']
79         p['type'] = "subscribed"
80         self.xmlstream.send(p)
81
82     def presence_unsubscribe(self, m):
83         # try to re-subscribe
84         self.info("%s removed us, trying to re-authenticate." % m['from'])
85         p = domish.Element(("jabber:client", "presence"))
86         p['from'], p['to'] = m['to'], m['from']
87         p['type'] = "subscribe"
88         self.xmlstream.send(p)
89
90     def presence(self, m):
91         p = domish.Element(("jabber:client", "presence"))
92         p['from'], p['to'] = m['to'], m['from']
93         # initially read presence.addElement, my wild guess.. -- Thierry
94         p.addElement("show", content="dnd")
95         p.addElement("status", content="man at work")
96         self.xmlstream.send(p)
97
98     def message_chat(self, m):
99         n = domish.Element((None, "message"))
100         n['to'] = m['from']
101         n['from'] = self.id.full()
102         n.addElement("body", content = "don't have time to chat. working!")
103         self.xmlstream.send(n)
104
105     
106 class PubSubClient(BaseClient):
107     """ PubSub (XEP 0060) implementation """
108
109     def __init__(self, id, secret, verbose = False, log = None):
110         BaseClient.__init__(self, id, secret, verbose = verbose, log = log)
111         self.hooks = {}
112     
113     def add_result_hook(self, hook_to, hook):
114         self.hooks[hook_to] = hook
115
116     def delete_result_hook(self, hook_to):
117         if self.hooks.has_key(hook_to):
118             del self.hooks[hook_to]
119
120     def event_authenticated(self, xs):
121         BaseClient.event_authenticated(self, xs)
122         self.requests = {}
123         xs.addObserver("/iq/pubsub/create", self.result_create_node)
124         xs.addObserver("/iq/pubsub/delete", self.result_delete_node)
125         xs.addObserver("/iq/query[@xmlns='http://jabber.org/protocol/disco#items']", self.result_discover)
126         xs.addObserver("/iq/pubsub/subscription[@subscription='subscribed']", self.result_subscribe_to_node)
127         xs.addObserver("/iq/pubsub/configure/x", self.result_configure_node)
128         xs.addObserver("/iq/pubsub/configure/error", self.result_configure_node)
129
130     def __iq(self, t="get"):
131         iq = domish.Element((None, "iq"))
132         iq['from'] = self.id.full()
133         iq['to'] = "pubsub.%s" % self.id.host
134         iq['type'] = t
135         iq.addUniqueId()
136         return iq
137
138     def __add_pubsub(self, iq):
139         pubsub = iq.addElement("pubsub")
140         pubsub['xmlns'] = "http://jabber.org/protocol/pubsub"
141         return pubsub
142
143
144     def discover(self, node = None):
145         iq = self.__iq("get")
146         query = iq.addElement("query")
147         query['xmlns'] = "http://jabber.org/protocol/disco#items"
148         if node:
149             query['node'] = node
150         self.requests[iq['id']] = node
151         self.xmlstream.send(iq)
152
153     def result_discover(self, iq):
154         hook = self.hooks.get('discover', None)
155         if hook:
156             hook(iq)
157             self.delete_result_hook('discover')
158
159         self.requests.pop(iq['id'])
160
161
162     def subscribe_to_node(self, node):
163         iq = self.__iq("set")
164         pubsub = self.__add_pubsub(iq)
165         subscribe = pubsub.addElement("subscribe")
166         subscribe['node'] = node
167         subscribe['jid'] = self.id.full()
168         self.requests[iq['id']] = node
169         self.xmlstream.send(iq)
170
171     def result_subscribe_to_node(self, iq):
172         self.requests.pop(iq['id'])
173
174
175     def publish_to_node(self, node, payload):
176         iq = self.__iq("set")
177         pubsub = self.__add_pubsub(iq)
178         publish = pubsub.addElement("publish")
179         publish['node'] = node
180         items = publish.addElement("item", content=payload)
181         self.requests[iq['id']] = node
182         self.xmlstream.send(iq)
183
184     def result_publish_to_node(self, iq):
185         self.requests.pop(iq['id'])
186
187
188     # TODO: ejabberd doesn't have the node configuration feature implmented yet!
189     def configure_node(self, node, fields=None):
190         iq = self.__iq("set")
191         pubsub = self.__add_pubsub(iq)
192         configure = pubsub.addElement("configure")
193         configure['node'] = node
194         
195         # TODO: add fields
196
197         self.requests[iq['id']] = node
198         self.xmlstream.send(iq)
199         
200     def result_configure_node(self, iq):
201         hook = self.hooks.get('configure', None)
202         if hook:
203             hook(iq)
204             self.delete_result_hook('configure')
205
206         self.requests.pop(iq['id'])
207
208
209     def create_node(self, node = None):
210         iq = self.__iq("set")
211         pubsub = self.__add_pubsub(iq)
212         create = pubsub.addElement("create")
213         if node:
214             create['node'] = node
215         configure = pubsub.addElement("configure")
216         self.requests[iq['id']] = node
217         self.xmlstream.send(iq)
218
219     def result_create_node(self, iq):
220         node = self.requests[iq['id']]
221         if iq.error:
222             if iq.error.conflict:
223                 # node is already there, nothing important.
224                 self.warn("NodeID exists: %s" % node)
225             else:
226                 err_type = ""
227                 if iq.error['type']:
228                     err_type = iq.error['type']
229                 self.error("Can not create node: %s (error type: %s)" %  (node, err_type))
230         else:
231             # no errors
232             # try subscribing to the node for debugging purposes
233             self.subscribe_to_node(node)
234
235         self.requests.pop(iq['id'])
236
237
238     def delete_node(self, node):
239         iq = self.__iq("set")
240         pubsub = self.__add_pubsub(iq)
241         delete = pubsub.addElement("delete")
242         delete['node'] = node
243         self.requests[iq['id']] = node
244         self.xmlstream.send(iq)
245
246     def result_delete_node(self, iq):
247         self.requests.pop(iq['id'])
248
249     def message_chat(self, m):
250         body = ""
251         for e in m.elements():
252             if e.name == "body":
253                 body = "%s" % e
254                 break
255
256 #         try:
257 #             node = m.event.items['node']
258 #             n = domish.Element((None, "message"))
259 #             n.addElement("body", content = "published to: %s\n%s" % (node, m.event.items.toXml()))
260 #             # for each listener in promiscuous mode send the published event
261 #             self.xmlstream.send(n)
262 #             return
263 #         except:
264 #             # not a pubsub message continue on
265 #             pass
266
267         if body == "list groups":
268             def list_groups(iq):
269                 reply = ""
270                 for i in iq.query.elements():
271                     reply += "%s\n" % i['node']
272                 n = domish.Element((None, "message"))
273                 n['to'] = m['from']
274                 n['from'] = self.id.full()
275                 n.addElement("body", content = reply)
276                 self.xmlstream.send(n)
277
278             self.add_result_hook("discover", list_groups)
279             self.discover()
280
281         elif body.startswith("configuration"):
282             # "configuration NODE"
283             node = ""
284             try:
285                 node = body.split()[1].strip()
286             except IndexError:
287                 pass
288
289             def get_configuration(iq):
290                 reply = iq.toXml()
291                 n = domish.Element((None, "message"))
292                 n['to'] = m['from']
293                 n['from'] = self.id.full()
294                 n.addElement("body", content = reply)
295                 self.xmlstream.send(n)
296
297             self.add_result_hook("configure", get_configuration)
298             self.configure_node(node)
299
300         else:
301             BaseClient.message_chat(self, m)
302
303
304 class Slicemgr(xmlrpc.XMLRPC, PubSubClient):
305     
306     DOMAIN = "/OMF"
307     RESOURCES = 'resources'
308
309     def __init__(self, id, secret, verbose = False, log = None):
310         xmlrpc.XMLRPC.__init__(self, allowNone=True)
311         PubSubClient.__init__(self, id, secret, verbose = verbose, log = log)
312         self.command_queue = Queue.Queue()
313
314         xmlrpc.addIntrospection(self)
315
316     def xmlrpc_createSlice(self, slice):
317         self.create_slice(slice)
318
319     def xmlrpc_addResource(self, slice, resource):
320         self.add_resource(slice, resource)
321
322     def xmlrpc_deleteSlice(self, slice):
323         self.delete_slice(slice)
324
325     def xmlrpc_removeResource(self, slice, resource):
326         self.delete_resource(slice, resource)
327
328
329     def flush_commands(self):
330 #        self.info("Running %d commands" % self.command_queue.qsize())
331         while not self.command_queue.empty():
332             (meth, param) = self.command_queue.get()
333             meth(param)
334
335     def create_slice(self, slice):
336         self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice]) ))
337         self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES]) ))
338
339     def add_resource(self, slice, resource):
340         self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
341
342     def delete_slice(self, slice):
343         slice_prefix = "/".join([self.DOMAIN,slice])
344         resource_prefix = "/".join([self.DOMAIN,slice,self.RESOURCES])
345         def delete_slice_resources(iq):
346             for i in iq.query.elements():
347                 node = i['node']
348                 if node.startswith(resource_prefix):
349                     self.command_queue.put((self.delete_node, node))
350
351         self.add_result_hook("discover", delete_slice_resources)
352         self.discover()
353
354         self.command_queue.put(( self.delete_node, resource_prefix) )
355         self.command_queue.put(( self.delete_node, slice_prefix) )
356
357     def delete_resource(self, slice, resource):
358         self.command_queue.put(( self.delete_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
359         
360
361
362 if __name__ == "__main__":
363
364     config = Config("/etc/planetlab/plc_config")
365
366     xmppserver = config.PLC_OMF_XMPP_SERVER
367     xmppuser = "@".join([config.PLC_OMF_XMPP_USER, xmppserver])
368     xmpppass = config.PLC_OMF_XMPP_PASSWORD
369     slicemgr = Slicemgr(xmppuser, xmpppass,
370                         log=open("/var/log/omf/pubsub_client.log", "a"),
371                         verbose=True)
372
373     t = task.LoopingCall(slicemgr.flush_commands)
374     t.start(5.0) # check every 5 seconds
375
376     reactor.callLater(1, slicemgr.create_node, "/OMF")
377     reactor.callLater(1, slicemgr.create_node, "/OMF/SYSTEM")
378
379     reactor.connectTCP(slicemgr.id.host, 5222, slicemgr.factory)
380     reactor.listenTCP(5053, server.Site(slicemgr), interface="localhost")
381     reactor.run(installSignalHandlers=True)
382
383
384