start/stop omf-slicemgr properly
[plcapi.git] / omf / omf-slicemgr.py
1 #!/usr/bin/python
2 # Baris Metin <tmetin@sophia.inria.fr>
3
4 import os
5 import sys
6 import Queue
7 from twisted.words.xish import domish
8 from twisted.web import xmlrpc, server
9 from twisted.internet import reactor, task
10 from twisted.words.protocols.jabber import xmlstream, client, jid
11
12 sys.path.append("/usr/share/plc_api/")
13 from PLC.Config import Config
14
15
16 class BaseClient(object):
17     """ Base XMPP client: handles authentication and basic presence/message requests. """
18     def __init__(self, id, secret, verbose = False, log = None):
19
20         if isinstance(id, (str, unicode)):
21             id = jid.JID(id)
22         x = client.XMPPClientFactory(id, secret)
23         x.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.event_connected)
24         x.addBootstrap(xmlstream.STREAM_END_EVENT, self.event_disconnected)
25         x.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.event_init_failed)
26         x.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.event_authenticated)
27         self.id = id
28         self.factory = x
29         self.verbose = verbose
30         self.log = log or sys.stdout
31
32     def __rawDataIN(self, buf):
33         if self.verbose: self.msg("RECV: %s" % buf)
34
35     def __rawDataOUT(self, buf):
36         if self.verbose: self.msg("SEND: %s" % buf)
37
38     def msg(self, msg):
39         self.log.write("%s\n" % msg)
40         self.log.flush()
41
42     def error(self, msg):
43         self.msg("ERROR: %s" % msg)
44
45     def warn(self, msg):
46         self.msg("WARN: %s" % msg)
47
48     def info(self, msg):
49         self.msg("INFO: %s" % msg)
50
51     def event_connected(self, xs):
52         # log all traffic
53         xs.rawDataInFn = self.__rawDataIN
54         xs.rawDataOutFn = self.__rawDataOUT
55         self.xmlstream = xs
56         
57     def event_disconnected(self, xs):
58         pass
59
60     def event_init_failed(self, xs):
61         self.error("Init Failed")
62
63     def event_authenticated(self, xs):
64         presence = domish.Element(("jabber:client", "presence"))
65         presence.addElement("show", content="dnd")
66         presence.addElement("status", content="man at work")
67         xs.send(presence)
68
69         # add protocol handlers
70         xs.addObserver("/presence[@type='subscribe']", self.presence_subscribe)
71         xs.addObserver("/presence[@type='unsubscribe']", self.presence_unsubscribe)
72         xs.addObserver("/precence", self.presence)
73         xs.addObserver("/message[@type='chat']", self.message_chat)
74
75     def presence_subscribe(self, m):
76         self.info("%s request to add us, granting." % m['from'])
77         p = domish.Element(("jabber:client", "presence"))
78         p['from'], p['to'] = m['to'], m['from']
79         p['type'] = "subscribed"
80         self.xmlstream.send(p)
81
82     def presence_unsubscribe(self, m):
83         # try to re-subscribe
84         self.info("%s removed us, trying to re-authenticate." % m['from'])
85         p = domish.Element(("jabber:client", "presence"))
86         p['from'], p['to'] = m['to'], m['from']
87         p['type'] = "subscribe"
88         self.xmlstream.send(p)
89
90     def presence(self, m):
91         p = domish.Element(("jabber:client", "presence"))
92         p['from'], p['to'] = m['to'], m['from']
93         presence.addElement("show", content="dnd")
94         presence.addElement("status", content="man at work")
95         self.xmlstream.send(p)
96
97     def message_chat(self, m):
98         n = domish.Element((None, "message"))
99         n['to'], n['from'] = m['from'], m['to']
100         n.addElement("body", content = "don't have time to chat. working!")
101         self.xmlstream.send(n)
102
103     
104 class PubSubClient(BaseClient):
105     """ PubSub (XEP 0060) implementation """
106
107     def __init__(self, id, secret, verbose = False, log = None):
108         BaseClient.__init__(self, id, secret, verbose = verbose, log = log)
109         self.hooks = {}
110     
111     def add_result_hook(self, hook_to, hook):
112         self.hooks[hook_to] = hook
113
114     def delete_result_hook(self, hook_to):
115         if self.hooks.has_key(hook_to):
116             del self.hooks[hook_to]
117
118     def event_authenticated(self, xs):
119         BaseClient.event_authenticated(self, xs)
120         self.requests = {}
121         xs.addObserver("/iq/pubsub/create", self.result_create_node)
122         xs.addObserver("/iq/pubsub/delete", self.result_delete_node)
123         xs.addObserver("/iq/query[@xmlns='http://jabber.org/protocol/disco#items']", self.result_discover)
124
125     def __iq(self, t="get"):
126         iq = domish.Element((None, "iq"))
127         iq['from'] = self.id.full()
128         iq['to'] = "pubsub.vplc27.inria.fr"
129         iq['type'] = t
130         iq.addUniqueId()
131         return iq
132
133     def discover(self, node = None):
134         iq = self.__iq("get")
135         query = iq.addElement("query")
136         query['xmlns'] = "http://jabber.org/protocol/disco#items"
137         if node:
138             query['node'] = node
139         self.requests[iq['id']] = node
140         self.xmlstream.send(iq)
141
142     def result_discover(self, iq):
143         if self.verbose: self.info("Items for node: %s" % self.requests[iq['id']])
144         
145         hook = self.hooks.get('discover', None)
146         for i in iq.query.elements():
147             if self.verbose: self.msg(i.toXml())
148             if hook: hook(i)
149         self.requests.pop(iq['id'])
150
151     def create_node(self, node = None):
152         iq = self.__iq("set")
153         pubsub = iq.addElement("pubsub")
154         pubsub['xmlns'] = "http://jabber.org/protocol/pubsub"
155         create = pubsub.addElement("create")
156         if node:
157             create['node'] = node
158         configure = pubsub.addElement("configure")
159         self.requests[iq['id']] = node
160         self.xmlstream.send(iq)
161
162     def result_create_node(self, iq):
163 #         if hasattr(iq, "error"):
164 #             node = self.requests[iq['id']]
165 #             if hasattr(iq.error, "conflict"):
166 #                 # node is already there, nothing important.
167 #                 self.warn("NodeID exists: %s" % node)
168 #             else:
169 #                 err_type = ""
170 #                 err_name = ""
171 #                 if iq.error:
172 #                     if iq.error.has_key('type'):
173 #                         err_type = iq.error['type']
174 #                     if iq.error.firstChildElement and hasattr(iq.error.firstChildElement, "name"):
175 #                         err_name = iq.error.firstChildElement.name
176 #                 self.error("Can not create node: %s (error type: %s, %s)" %  (node, err_type, err_name))
177         self.requests.pop(iq['id'])
178
179
180     def delete_node(self, node):
181         iq = self.__iq("set")
182         pubsub = iq.addElement("pubsub")
183         pubsub['xmlns'] = "http://jabber.org/protocol/pubsub#owner"
184         delete = pubsub.addElement("delete")
185         delete['node'] = node
186         self.requests[iq['id']] = node
187         self.xmlstream.send(iq)
188
189     def result_delete_node(self, iq):
190         self.requests.pop(iq['id'])
191
192
193
194
195 class Slicemgr(xmlrpc.XMLRPC, PubSubClient):
196     
197     DOMAIN = "/OMF"
198     RESOURCES = 'resources'
199
200     def __init__(self, id, secret, verbose = False, log = None):
201         xmlrpc.XMLRPC.__init__(self, allowNone=True)
202         PubSubClient.__init__(self, id, secret, verbose = verbose, log = log)
203         self.command_queue = Queue.Queue()
204
205         xmlrpc.addIntrospection(self)
206
207     def xmlrpc_createSlice(self, slice):
208         self.create_slice(slice)
209
210     def xmlrpc_addResource(self, slice, resource):
211         self.add_resource(slice, resource)
212
213     def xmlrpc_deleteSlice(self, slice):
214         self.delete_slice(slice)
215
216     def xmlrpc_removeResource(self, slice, resource):
217         self.delete_resource(slice, resource)
218
219
220     def flush_commands(self):
221 #        self.info("Running %d commands" % self.command_queue.qsize())
222         while not self.command_queue.empty():
223             (meth, param) = self.command_queue.get()
224             meth(param)
225
226     def create_slice(self, slice):
227         self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice]) ))
228         self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES]) ))
229
230     def add_resource(self, slice, resource):
231         self.command_queue.put(( self.create_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
232
233     def delete_slice(self, slice):
234         slice_prefix = "/".join([self.DOMAIN,slice])
235         resource_prefix = "/".join([self.DOMAIN,slice,self.RESOURCES])
236         def delete_slice_resources(iq):
237             node = iq['node']
238             if node.startswith(resource_prefix):
239                 self.command_que.put(self.delete_node, node)
240
241         self.add_result_hook("discover", delete_slice_resources)
242         self.discover()
243         self.delete_result_hook("discover")
244
245         self.command_queue.put(( self.delete_node, resource_prefix) )
246         self.command_queue.put(( self.delete_node, slice_prefix) )
247
248     def delete_resource(self, slice, resource):
249         self.command_queue.put(( self.delete_node, "/".join([self.DOMAIN,slice,self.RESOURCES,resource]) ))
250         
251
252
253 if __name__ == "__main__":
254
255     config = Config("/etc/planetlab/plc_config")
256
257     xmppserver = config.PLC_OMF_XMPP_SERVER
258     xmppuser = "@".join([config.PLC_OMF_XMPP_USER, xmppserver])
259     xmpppass = config.PLC_OMF_XMPP_PASSWORD
260     slicemgr = Slicemgr(xmppuser, xmpppass,
261                         log=open("/var/log/omf/pubsub_client.log", "a"),
262                         verbose=True)
263
264     t = task.LoopingCall(slicemgr.flush_commands)
265     t.start(5.0) # check every 5 seconds
266     reactor.connectTCP(slicemgr.id.host, 5222, slicemgr.factory)
267     reactor.listenTCP(5053, server.Site(slicemgr), interface="localhost")
268     reactor.run(installSignalHandlers=True)
269
270
271