OMF integration for plcapi, only activated when PLC_OMF_ENABLED is true.
[plcapi.git] / omf / omf-slicemgr.py
1 # Baris Metin <tmetin@sophia.inria.fr>
2
3 import os
4 import sys
5 import Queue
6 from twisted.words.xish import domish
7 from twisted.web import xmlrpc, server
8 from twisted.internet import reactor, task
9 from twisted.words.protocols.jabber import xmlstream, client, jid
10
11 sys.path.append("/usr/share/plc_api/")
12 from PLC.Config import Config
13
14
15 class BaseClient(object):
16     """ Base XMPP client: handles authentication and basic presence/message requests. """
17     def __init__(self, id, secret, verbose = False, log = None):
18
19         if isinstance(id, (str, unicode)):
20             id = jid.JID(id)
21         x = client.XMPPClientFactory(id, secret)
22         x.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.event_connected)
23         x.addBootstrap(xmlstream.STREAM_END_EVENT, self.event_disconnected)
24         x.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.event_init_failed)
25         x.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.event_authenticated)
26         self.id = id
27         self.factory = x
28         self.verbose = verbose
29         self.log = log or sys.stdout
30
31     def __rawDataIN(self, buf):
32         if self.verbose: self.msg("RECV: %s" % buf)
33
34     def __rawDataOUT(self, buf):
35         if self.verbose: self.msg("SEND: %s" % buf)
36
37     def msg(self, msg):
38         self.log.write("%s\n" % msg)
39         self.log.flush()
40
41     def error(self, msg):
42         self.msg("ERROR: %s" % msg)
43
44     def warn(self, msg):
45         self.msg("WARN: %s" % msg)
46
47     def info(self, msg):
48         self.msg("INFO: %s" % msg)
49
50     def event_connected(self, xs):
51         # log all traffic
52         xs.rawDataInFn = self.__rawDataIN
53         xs.rawDataOutFn = self.__rawDataOUT
54         self.xmlstream = xs
55         
56     def event_disconnected(self, xs):
57         pass
58
59     def event_init_failed(self, xs):
60         self.error("Init Failed")
61
62     def event_authenticated(self, xs):
63         presence = domish.Element(("jabber:client", "presence"))
64         presence.addElement("show", content="dnd")
65         presence.addElement("status", content="man at work")
66         xs.send(presence)
67
68         # add protocol handlers
69         xs.addObserver("/presence[@type='subscribe']", self.presence_subscribe)
70         xs.addObserver("/presence[@type='unsubscribe']", self.presence_unsubscribe)
71         xs.addObserver("/precence", self.presence)
72         xs.addObserver("/message[@type='chat']", self.message_chat)
73
74     def presence_subscribe(self, m):
75         self.info("%s request to add us, granting." % m['from'])
76         p = domish.Element(("jabber:client", "presence"))
77         p['from'], p['to'] = m['to'], m['from']
78         p['type'] = "subscribed"
79         self.xmlstream.send(p)
80
81     def presence_unsubscribe(self, m):
82         # try to re-subscribe
83         self.info("%s removed us, trying to re-authenticate." % m['from'])
84         p = domish.Element(("jabber:client", "presence"))
85         p['from'], p['to'] = m['to'], m['from']
86         p['type'] = "subscribe"
87         self.xmlstream.send(p)
88
89     def presence(self, m):
90         p = domish.Element(("jabber:client", "presence"))
91         p['from'], p['to'] = m['to'], m['from']
92         presence.addElement("show", content="dnd")
93         presence.addElement("status", content="man at work")
94         self.xmlstream.send(p)
95
96     def message_chat(self, m):
97         n = domish.Element((None, "message"))
98         n['to'], n['from'] = m['from'], m['to']
99         n.addElement("body", content = "don't have time to chat. working!")
100         self.xmlstream.send(n)
101
102     
103 class PubSubClient(BaseClient):
104     """ PubSub (XEP 0060) implementation """
105
106     def __init__(self, id, secret, verbose = False, log = None):
107         BaseClient.__init__(self, id, secret, verbose = verbose, log = log)
108         self.hooks = {}
109     
110     def add_result_hook(self, hook_to, hook):
111         self.hooks[hook_to] = hook
112
113     def delete_result_hook(self, hook_to):
114         if self.hooks.has_key(hook_to):
115             del self.hooks[hook_to]
116
117     def event_authenticated(self, xs):
118         BaseClient.event_authenticated(self, xs)
119         self.requests = {}
120         xs.addObserver("/iq/pubsub/create", self.result_create_node)
121         xs.addObserver("/iq/pubsub/delete", self.result_delete_node)
122         xs.addObserver("/iq/query[@xmlns='http://jabber.org/protocol/disco#items']", self.result_discover)
123
124     def __iq(self, t="get"):
125         iq = domish.Element((None, "iq"))
126         iq['from'] = self.id.full()
127         iq['to'] = "pubsub.vplc27.inria.fr"
128         iq['type'] = t
129         iq.addUniqueId()
130         return iq
131
132     def discover(self, node = None):
133         iq = self.__iq("get")
134         query = iq.addElement("query")
135         query['xmlns'] = "http://jabber.org/protocol/disco#items"
136         if node:
137             query['node'] = node
138         self.requests[iq['id']] = node
139         self.xmlstream.send(iq)
140
141     def result_discover(self, iq):
142         if self.verbose: self.info("Items for node: %s" % self.requests[iq['id']])
143         
144         hook = self.hooks.get('discover', None)
145         for i in iq.query.elements():
146             if self.verbose: self.msg(i.toXml())
147             if hook: hook(i)
148         self.requests.pop(iq['id'])
149
150     def create_node(self, node = None):
151         iq = self.__iq("set")
152         pubsub = iq.addElement("pubsub")
153         pubsub['xmlns'] = "http://jabber.org/protocol/pubsub"
154         create = pubsub.addElement("create")
155         if node:
156             create['node'] = node
157         configure = pubsub.addElement("configure")
158         self.requests[iq['id']] = node
159         self.xmlstream.send(iq)
160
161     def result_create_node(self, iq):
162 #         if hasattr(iq, "error"):
163 #             node = self.requests[iq['id']]
164 #             if hasattr(iq.error, "conflict"):
165 #                 # node is already there, nothing important.
166 #                 self.warn("NodeID exists: %s" % node)
167 #             else:
168 #                 err_type = ""
169 #                 err_name = ""
170 #                 if iq.error:
171 #                     if iq.error.has_key('type'):
172 #                         err_type = iq.error['type']
173 #                     if iq.error.firstChildElement and hasattr(iq.error.firstChildElement, "name"):
174 #                         err_name = iq.error.firstChildElement.name
175 #                 self.error("Can not create node: %s (error type: %s, %s)" %  (node, err_type, err_name))
176         self.requests.pop(iq['id'])
177
178
179     def delete_node(self, node):
180         iq = self.__iq("set")
181         pubsub = iq.addElement("pubsub")
182         pubsub['xmlns'] = "http://jabber.org/protocol/pubsub#owner"
183         delete = pubsub.addElement("delete")
184         delete['node'] = node
185         self.requests[iq['id']] = node
186         self.xmlstream.send(iq)
187
188     def result_delete_node(self, iq):
189         self.requests.pop(iq['id'])
190
191
192
193
194 class Slicemgr(PubSubClient, xmlrpc.XMLRPC):
195     
196     DOMAIN = "/OMF"
197     RESOURCES = 'resources'
198
199     def __init__(self, id, secret, verbose = False, log = None):
200         PubSubClient.__init__(self, id, secret, verbose = verbose, log = log)
201         self.command_queue = Queue.Queue()
202
203         # for xmlrpc interface
204         self.allowNone = True
205
206     def xmlrpc_createSlice(self, slice):
207         self.create_slice(slice)
208
209     def xmlrpc_addResource(self, slice, resource):
210         self.add_resource(slice, resource)
211
212     def xmlrpc_deleteSlice(self, slice):
213         self.delete_slice(slice)
214
215     def xmlrpc_removeResource(self, slice, resource):
216         self.delete_resource(slice, resource)
217
218
219     def flush_commands(self):
220 #        self.info("Running %d commands" % self.command_queue.qsize())
221         while not self.command_queue.empty():
222             (meth, param) = self.command_queue.get()
223             meth(param)
224
225     def create_slice(self, slice):
226         self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice]) ))
227         self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES]) ))
228
229     def add_resource(self, slice, resource):
230         self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
231
232     def delete_slice(self, slice):
233         slice_prefix = "/".join([self.DOMAIN,slice])
234         resource_prefix = "/".join([self.DOMAIN,slice,self.RESOURCES])
235         def delete_slice_resources(iq):
236             node = iq['node']
237             if node.startswith(resource_prefix):
238                 self.command_que.put(self.delete_node, node)
239
240         self.add_result_hook("discover", delete_slice_resources)
241         self.discover()
242         self.delete_result_hook("discover")
243
244         self.command_queue.put(( self.delete_node, resource_prefix) )
245         self.command_queue.put(( self.delete_node, slice_prefix) )
246
247     def delete_resource(self, slice, resource):
248         self.command_queue.put(( self.delete_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
249         
250
251
252 if __name__ == "__main__":
253
254     config = Config("/etc/planetlab/plc_config")
255
256     xmppserver = self.config.PLC_OMF_XMPP_SERVER
257     xmppuser = "@".join([self.config.PLC_OMF_XMPP_USER, xmppserver])
258     xmpppass = self.config.PLC_OMF_XMPP_PASSWORD
259     slicemgr = Slicemgr(xmppuser, xmpppass,
260                         log=open("/var/log/omf/pubsub_client.log", "a"),
261                         verbose=True)
262
263     t = task.LoopingCall(slicemgr.flush_commands)
264     t.start(5.0) # check every 5 seconds
265     reactor.connectTCP(slicemgr.id.host, 5222, slicemgr.factory)
266     reactor.listenTCP(5053, server.Site(slicemgr))
267     reactor.run(installSignalHandlers=True)
268
269
270