Implement initial Python bindings for Open vSwitch database.
[sliver-openvswitch.git] / python / ovs / jsonrpc.py
1 # Copyright (c) 2010 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import logging
17 import os
18
19 import ovs.poller
20 import ovs.reconnect
21 import ovs.stream
22 import ovs.timeval
23
24 EOF = -1
25
26 class Message(object):
27     T_REQUEST = 0               # Request.
28     T_NOTIFY = 1                # Notification.
29     T_REPLY = 2                 # Successful reply.
30     T_ERROR = 3                 # Error reply.
31
32     __types = {T_REQUEST: "request",
33                T_NOTIFY: "notification",
34                T_REPLY: "reply",
35                T_ERROR: "error"}
36     __next_id = 0
37
38     def __init__(self, type, method, params, result, error, id):
39         self.type = type
40         self.method = method
41         self.params = params
42         self.result = result
43         self.error = error
44         self.id = id
45
46     _next_id = 0
47     @staticmethod
48     def _create_id():
49         this_id = Message._next_id
50         Message._next_id += 1
51         return this_id
52
53     @staticmethod
54     def create_request(method, params):
55         return Message(Message.T_REQUEST, method, params, None, None,
56                        Message._create_id())
57
58     @staticmethod
59     def create_notify(method, params):
60         return Message(Message.T_NOTIFY, method, params, None, None,
61                        None)
62
63     @staticmethod
64     def create_reply(result, id):
65         return Message(Message.T_REPLY, None, None, result, None, id)
66
67     @staticmethod
68     def create_error(error, id):
69         return Message(Message.T_ERROR, None, None, None, error, id)
70
71     @staticmethod
72     def type_to_string(type):
73         return Message.__types[type]
74
75     @staticmethod
76     def __validate_arg(value, name, must_have):
77         if (value is not None) == (must_have != 0):
78             return None
79         else:
80             type_name = Message.type_to_string(self.type)
81             if must_have:
82                 verb = "must"
83             else:
84                 verb = "must not"
85             return "%s %s have \"%s\"" % (type_name, verb, name)
86
87     def is_valid(self):
88         if self.params is not None and type(self.params) != list:
89             return "\"params\" must be JSON array"
90
91         pattern = {Message.T_REQUEST: 0x11001,
92                    Message.T_NOTIFY:  0x11000,
93                    Message.T_REPLY:   0x00101,
94                    Message.T_ERROR:   0x00011}.get(self.type)
95         if pattern is None:
96             return "invalid JSON-RPC message type %s" % self.type
97
98         return (
99             Message.__validate_arg(self.method, "method", pattern & 0x10000) or
100             Message.__validate_arg(self.params, "params", pattern & 0x1000) or
101             Message.__validate_arg(self.result, "result", pattern & 0x100) or
102             Message.__validate_arg(self.error, "error", pattern & 0x10) or
103             Message.__validate_arg(self.id, "id", pattern & 0x1))
104
105     @staticmethod
106     def from_json(json):
107         if type(json) != dict:
108             return "message is not a JSON object"
109
110         # Make a copy to avoid modifying the caller's dict.
111         json = dict(json)
112
113         if "method" in json:
114             method = json.pop("method")
115             if type(method) not in [str, unicode]:
116                 return "method is not a JSON string"
117         else:
118             method = None
119
120         params = json.pop("params", None)
121         result = json.pop("result", None)
122         error = json.pop("error", None)
123         id = json.pop("id", None)
124         if len(json):
125             return "message has unexpected member \"%s\"" % json.popitem()[0]
126
127         if result is not None:
128             msg_type = Message.T_REPLY
129         elif error is not None:
130             msg_type = Message.T_ERROR
131         elif id is not None:
132             msg_type = Message.T_REQUEST
133         else:
134             msg_type = Message.T_NOTIFY
135         
136         msg = Message(msg_type, method, params, result, error, id)
137         validation_error = msg.is_valid()
138         if validation_error is not None:
139             return validation_error
140         else:
141             return msg
142
143     def to_json(self):
144         json = {}
145
146         if self.method is not None:
147             json["method"] = self.method
148
149         if self.params is not None:
150             json["params"] = self.params
151
152         if self.result is not None or self.type == Message.T_ERROR:
153             json["result"] = self.result
154
155         if self.error is not None or self.type == Message.T_REPLY:
156             json["error"] = self.error
157
158         if self.id is not None or self.type == Message.T_NOTIFY:
159             json["id"] = self.id
160
161         return json
162
163     def __str__(self):
164         s = [Message.type_to_string(self.type)]
165         if self.method is not None:
166             s.append("method=\"%s\"" % self.method)
167         if self.params is not None:
168             s.append("params=" + ovs.json.to_string(self.params))
169         if self.error is not None:
170             s.append("error=" + ovs.json.to_string(self.error))
171         if self.id is not None:
172             s.append("id=" + ovs.json.to_string(self.id))
173         return ", ".join(s)
174
175 class Connection(object):
176     def __init__(self, stream):
177         self.name = stream.get_name()
178         self.stream = stream
179         self.status = 0
180         self.input = ""
181         self.output = ""
182         self.parser = None
183
184     def close(self):
185         self.stream.close()
186         self.stream = None
187
188     def run(self):
189         if self.status:
190             return
191
192         while len(self.output):
193             retval = self.stream.send(self.output)
194             if retval >= 0:
195                 self.output = self.output[retval:]
196             else:
197                 if retval != -errno.EAGAIN:
198                     logging.warn("%s: send error: %s" % (self.name,
199                                                          os.strerror(-retval)))
200                     self.error(-retval)
201                 break
202
203     def wait(self, poller):
204         if not self.status:
205             self.stream.run_wait(poller)
206             if len(self.output):
207                 self.stream.send_wait()
208
209     def get_status(self):
210         return self.status
211
212     def get_backlog(self):
213         if self.status != 0:
214             return 0
215         else:
216             return len(self.output)
217
218     def get_name(self):
219         return self.name
220
221     def __log_msg(self, title, msg):
222         logging.debug("%s: %s %s" % (self.name, title, msg))
223
224     def send(self, msg):
225         if self.status:
226             return self.status
227
228         self.__log_msg("send", msg)
229
230         was_empty = len(self.output) == 0
231         self.output += ovs.json.to_string(msg.to_json())
232         if was_empty:
233             self.run()
234         return self.status
235
236     def send_block(self, msg):
237         error = self.send(msg)
238         if error:
239             return error
240
241         while True:
242             self.run()
243             if not self.get_backlog() or self.get_status():
244                 return self.status
245
246             poller = ovs.poller.Poller()
247             self.wait(poller)
248             poller.block()
249
250     def recv(self):
251         if self.status:
252             return self.status, None
253
254         while True:
255             if len(self.input) == 0:
256                 error, data = self.stream.recv(4096)
257                 if error:
258                     if error == errno.EAGAIN:
259                         return error, None
260                     else:
261                         # XXX rate-limit
262                         logging.warning("%s: receive error: %s"
263                                         % (self.name, os.strerror(error)))
264                         self.error(error)
265                         return self.status, None
266                 elif len(data) == 0:
267                     self.error(EOF)
268                     return EOF, None
269                 else:
270                     self.input += data
271             else:
272                 if self.parser is None:
273                     self.parser = ovs.json.Parser()
274                 self.input = self.input[self.parser.feed(self.input):]
275                 if self.parser.is_done():
276                     msg = self.__process_msg()
277                     if msg:
278                         return 0, msg
279                     else:
280                         return self.status, None
281
282     def recv_block(self):
283         while True:
284             error, msg = self.recv()
285             if error != errno.EAGAIN:
286                 return error, msg
287
288             self.run()
289
290             poller = ovs.poller.Poller()
291             self.wait(poller)
292             self.recv_wait(poller)
293             poller.block()
294     
295     def transact_block(self, request):
296         id = request.id
297
298         error = self.send(request)
299         reply = None
300         while not error:
301             error, reply = self.recv_block()
302             if reply and reply.type == Message.T_REPLY and reply.id == id:
303                 break
304         return error, reply
305
306     def __process_msg(self):
307         json = self.parser.finish()
308         self.parser = None
309         if type(json) in [str, unicode]:
310             # XXX rate-limit
311             logging.warning("%s: error parsing stream: %s" % (self.name, json))
312             self.error(errno.EPROTO)
313             return
314
315         msg = Message.from_json(json)
316         if not isinstance(msg, Message):
317             # XXX rate-limit
318             logging.warning("%s: received bad JSON-RPC message: %s"
319                             % (self.name, msg))
320             self.error(errno.EPROTO)
321             return
322
323         self.__log_msg("received", msg)
324         return msg
325         
326     def recv_wait(self, poller):
327         if self.status or len(self.input) > 0:
328             poller.immediate_wake()
329         else:
330             self.stream.recv_wait(poller)
331
332     def error(self, error):
333         if self.status == 0:
334             self.status = error
335             self.stream.close()
336             self.output = ""
337             
338 class Session(object):
339     """A JSON-RPC session with reconnection."""
340
341     def __init__(self, reconnect, rpc):
342         self.reconnect = reconnect
343         self.rpc = rpc
344         self.stream = None
345         self.pstream = None
346         self.seqno = 0
347
348     @staticmethod
349     def open(name):
350         """Creates and returns a Session that maintains a JSON-RPC session to
351         'name', which should be a string acceptable to ovs.stream.Stream or
352         ovs.stream.PassiveStream's initializer.
353         
354         If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
355         session connects and reconnects, with back-off, to 'name'.
356         
357         If 'name' is a passive connection method, e.g. "ptcp:", the new session
358         listens for connections to 'name'.  It maintains at most one connection
359         at any given time.  Any new connection causes the previous one (if any)
360         to be dropped."""
361         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
362         reconnect.set_name(name)
363         reconnect.enable(ovs.timeval.msec())
364
365         if ovs.stream.PassiveStream.is_valid_name(name):
366             self.reconnect.set_passive(True, ovs.timeval.msec())
367
368         return Session(reconnect, None)
369
370     @staticmethod
371     def open_unreliably(jsonrpc):
372         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
373         reconnect.set_quiet(True)
374         reconnect.set_name(jsonrpc.get_name())
375         reconnect.set_max_tries(0)
376         reconnect.connected(ovs.timeval.msec())
377         return Session(reconnect, jsonrpc)
378
379     def close(self):
380         if self.rpc is not None:
381             self.rpc.close()
382             self.rpc = None
383         if self.stream is not None:
384             self.stream.close()
385             self.stream = None
386         if self.pstream is not None:
387             self.pstream.close()
388             self.pstream = None
389
390     def __disconnect(self):
391         if self.rpc is not None:
392             self.rpc.error(EOF)
393             self.rpc.close()
394             self.rpc = None
395             self.seqno += 1
396         elif self.stream is not None:
397             self.stream.close()
398             self.stream = None
399             self.seqno += 1
400     
401     def __connect(self):
402         self.__disconnect()
403
404         name = self.reconnect.get_name()
405         if not self.reconnect.is_passive():
406             error, self.stream = ovs.stream.Stream.open(name)
407             if not error:
408                 self.reconnect.connecting(ovs.timeval.msec())
409             else:
410                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
411         elif self.pstream is not None:
412             error, self.pstream = ovs.stream.PassiveStream.open(name)
413             if not error:
414                 self.reconnect.listening(ovs.timeval.msec())
415             else:
416                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
417
418         self.seqno += 1
419
420     def run(self):
421         if self.pstream is not None:
422             error, stream = self.pstream.accept()
423             if error == 0:
424                 if self.rpc or self.stream:
425                     # XXX rate-limit
426                     logging.info("%s: new connection replacing active "
427                                  "connection" % self.reconnect.get_name())
428                     self.__disconnect()
429                 self.reconnect.connected(ovs.timeval.msec())
430                 self.rpc = Connection(stream)
431             elif error != errno.EAGAIN:
432                 self.reconnect.listen_error(ovs.timeval.msec(), error)
433                 self.pstream.close()
434                 self.pstream = None
435
436         if self.rpc:
437             self.rpc.run()
438             error = self.rpc.get_status()
439             if error != 0:
440                 self.reconnect.disconnected(ovs.timeval.msec(), error)
441                 self.__disconnect()
442         elif self.stream is not None:
443             self.stream.run()
444             error = self.stream.connect()
445             if error == 0:
446                 self.reconnect.connected(ovs.timeval.msec())
447                 self.rpc = Connection(self.stream)
448                 self.stream = None
449             elif error != errno.EAGAIN:
450                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
451                 self.stream.close()
452                 self.stream = None
453
454         action = self.reconnect.run(ovs.timeval.msec())
455         if action == ovs.reconnect.CONNECT:
456             self.__connect()
457         elif action == ovs.reconnect.DISCONNECT:
458             self.reconnect.disconnected(ovs.timeval.msec(), 0)
459             self.__disconnect()
460         elif action == ovs.reconnect.PROBE:
461             if self.rpc:
462                 request = Message.create_request("echo", [])
463                 request.id = "echo"
464                 self.rpc.send(request)
465         else:
466             assert action == None
467
468     def wait(self, poller):
469         if self.rpc is not None:
470             self.rpc.wait(poller)
471         elif self.stream is not None:
472             self.stream.run_wait(poller)
473             self.stream.connect_wait(poller)
474         if self.pstream is not None:
475             self.pstream.wait(poller)
476         self.reconnect.wait(poller, ovs.timeval.msec())
477
478     def get_backlog(self):
479         if self.rpc is not None:
480             return self.rpc.get_backlog()
481         else:
482             return 0
483
484     def get_name(self):
485         return self.reconnect.get_name()
486
487     def send(self, msg):
488         if self.rpc is not None:
489             return self.rpc.send(msg)
490         else:
491             return errno.ENOTCONN
492
493     def recv(self):
494         if self.rpc is not None:
495             error, msg = self.rpc.recv()
496             if not error:
497                 self.reconnect.received(ovs.timeval.msec())
498                 if msg.type == Message.T_REQUEST and msg.method == "echo":
499                     # Echo request.  Send reply.
500                     self.send(Message.create_reply(msg.params, msg.id))
501                 elif msg.type == Message.T_REPLY and msg.id == "echo":
502                     # It's a reply to our echo request.  Suppress it.
503                     pass
504                 else:
505                     return msg
506         return None
507
508     def recv_wait(self, poller):
509         if self.rpc is not None:
510             self.rpc.recv_wait(poller)
511
512     def is_alive(self):
513         if self.rpc is not None or self.stream is not None:
514             return True
515         else:
516             max_tries = self.reconnect.get_max_tries()
517             return max_tries is None or max_tries > 0
518     
519     def is_connected(self):
520         return self.rpc is not None
521
522     def get_seqno(self):
523         return self.seqno
524
525     def force_reconnect(self):
526         self.reconnect.force_reconnect(ovs.timeval.msec())