python: Style cleanup.
[sliver-openvswitch.git] / python / ovs / jsonrpc.py
1 # Copyright (c) 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import logging
17 import os
18
19 import ovs.json
20 import ovs.poller
21 import ovs.reconnect
22 import ovs.stream
23 import ovs.timeval
24
25 EOF = -1
26
27
28 class Message(object):
29     T_REQUEST = 0               # Request.
30     T_NOTIFY = 1                # Notification.
31     T_REPLY = 2                 # Successful reply.
32     T_ERROR = 3                 # Error reply.
33
34     __types = {T_REQUEST: "request",
35                T_NOTIFY: "notification",
36                T_REPLY: "reply",
37                T_ERROR: "error"}
38
39     def __init__(self, type_, method, params, result, error, id):
40         self.type = type_
41         self.method = method
42         self.params = params
43         self.result = result
44         self.error = error
45         self.id = id
46
47     _next_id = 0
48
49     @staticmethod
50     def _create_id():
51         this_id = Message._next_id
52         Message._next_id += 1
53         return this_id
54
55     @staticmethod
56     def create_request(method, params):
57         return Message(Message.T_REQUEST, method, params, None, None,
58                        Message._create_id())
59
60     @staticmethod
61     def create_notify(method, params):
62         return Message(Message.T_NOTIFY, method, params, None, None,
63                        None)
64
65     @staticmethod
66     def create_reply(result, id):
67         return Message(Message.T_REPLY, None, None, result, None, id)
68
69     @staticmethod
70     def create_error(error, id):
71         return Message(Message.T_ERROR, None, None, None, error, id)
72
73     @staticmethod
74     def type_to_string(type_):
75         return Message.__types[type_]
76
77     def __validate_arg(self, value, name, must_have):
78         if (value is not None) == (must_have != 0):
79             return None
80         else:
81             type_name = Message.type_to_string(self.type)
82             if must_have:
83                 verb = "must"
84             else:
85                 verb = "must not"
86             return "%s %s have \"%s\"" % (type_name, verb, name)
87
88     def is_valid(self):
89         if self.params is not None and type(self.params) != list:
90             return "\"params\" must be JSON array"
91
92         pattern = {Message.T_REQUEST: 0x11001,
93                    Message.T_NOTIFY:  0x11000,
94                    Message.T_REPLY:   0x00101,
95                    Message.T_ERROR:   0x00011}.get(self.type)
96         if pattern is None:
97             return "invalid JSON-RPC message type %s" % self.type
98
99         return (
100             self.__validate_arg(self.method, "method", pattern & 0x10000) or
101             self.__validate_arg(self.params, "params", pattern & 0x1000) or
102             self.__validate_arg(self.result, "result", pattern & 0x100) or
103             self.__validate_arg(self.error, "error", pattern & 0x10) or
104             self.__validate_arg(self.id, "id", pattern & 0x1))
105
106     @staticmethod
107     def from_json(json):
108         if type(json) != dict:
109             return "message is not a JSON object"
110
111         # Make a copy to avoid modifying the caller's dict.
112         json = dict(json)
113
114         if "method" in json:
115             method = json.pop("method")
116             if type(method) not in [str, unicode]:
117                 return "method is not a JSON string"
118         else:
119             method = None
120
121         params = json.pop("params", None)
122         result = json.pop("result", None)
123         error = json.pop("error", None)
124         id_ = json.pop("id", None)
125         if len(json):
126             return "message has unexpected member \"%s\"" % json.popitem()[0]
127
128         if result is not None:
129             msg_type = Message.T_REPLY
130         elif error is not None:
131             msg_type = Message.T_ERROR
132         elif id_ is not None:
133             msg_type = Message.T_REQUEST
134         else:
135             msg_type = Message.T_NOTIFY
136
137         msg = Message(msg_type, method, params, result, error, id_)
138         validation_error = msg.is_valid()
139         if validation_error is not None:
140             return validation_error
141         else:
142             return msg
143
144     def to_json(self):
145         json = {}
146
147         if self.method is not None:
148             json["method"] = self.method
149
150         if self.params is not None:
151             json["params"] = self.params
152
153         if self.result is not None or self.type == Message.T_ERROR:
154             json["result"] = self.result
155
156         if self.error is not None or self.type == Message.T_REPLY:
157             json["error"] = self.error
158
159         if self.id is not None or self.type == Message.T_NOTIFY:
160             json["id"] = self.id
161
162         return json
163
164     def __str__(self):
165         s = [Message.type_to_string(self.type)]
166         if self.method is not None:
167             s.append("method=\"%s\"" % self.method)
168         if self.params is not None:
169             s.append("params=" + ovs.json.to_string(self.params))
170         if self.result is not None:
171             s.append("result=" + ovs.json.to_string(self.result))
172         if self.error is not None:
173             s.append("error=" + ovs.json.to_string(self.error))
174         if self.id is not None:
175             s.append("id=" + ovs.json.to_string(self.id))
176         return ", ".join(s)
177
178
179 class Connection(object):
180     def __init__(self, stream):
181         self.name = stream.name
182         self.stream = stream
183         self.status = 0
184         self.input = ""
185         self.output = ""
186         self.parser = None
187
188     def close(self):
189         self.stream.close()
190         self.stream = None
191
192     def run(self):
193         if self.status:
194             return
195
196         while len(self.output):
197             retval = self.stream.send(self.output)
198             if retval >= 0:
199                 self.output = self.output[retval:]
200             else:
201                 if retval != -errno.EAGAIN:
202                     logging.warn("%s: send error: %s" % (self.name,
203                                                          os.strerror(-retval)))
204                     self.error(-retval)
205                 break
206
207     def wait(self, poller):
208         if not self.status:
209             self.stream.run_wait(poller)
210             if len(self.output):
211                 self.stream.send_wait()
212
213     def get_status(self):
214         return self.status
215
216     def get_backlog(self):
217         if self.status != 0:
218             return 0
219         else:
220             return len(self.output)
221
222     def __log_msg(self, title, msg):
223         logging.debug("%s: %s %s" % (self.name, title, msg))
224
225     def send(self, msg):
226         if self.status:
227             return self.status
228
229         self.__log_msg("send", msg)
230
231         was_empty = len(self.output) == 0
232         self.output += ovs.json.to_string(msg.to_json())
233         if was_empty:
234             self.run()
235         return self.status
236
237     def send_block(self, msg):
238         error = self.send(msg)
239         if error:
240             return error
241
242         while True:
243             self.run()
244             if not self.get_backlog() or self.get_status():
245                 return self.status
246
247             poller = ovs.poller.Poller()
248             self.wait(poller)
249             poller.block()
250
251     def recv(self):
252         if self.status:
253             return self.status, None
254
255         while True:
256             if not self.input:
257                 error, data = self.stream.recv(4096)
258                 if error:
259                     if error == errno.EAGAIN:
260                         return error, None
261                     else:
262                         # XXX rate-limit
263                         logging.warning("%s: receive error: %s"
264                                         % (self.name, os.strerror(error)))
265                         self.error(error)
266                         return self.status, None
267                 elif not data:
268                     self.error(EOF)
269                     return EOF, None
270                 else:
271                     self.input += data
272             else:
273                 if self.parser is None:
274                     self.parser = ovs.json.Parser()
275                 self.input = self.input[self.parser.feed(self.input):]
276                 if self.parser.is_done():
277                     msg = self.__process_msg()
278                     if msg:
279                         return 0, msg
280                     else:
281                         return self.status, None
282
283     def recv_block(self):
284         while True:
285             error, msg = self.recv()
286             if error != errno.EAGAIN:
287                 return error, msg
288
289             self.run()
290
291             poller = ovs.poller.Poller()
292             self.wait(poller)
293             self.recv_wait(poller)
294             poller.block()
295
296     def transact_block(self, request):
297         id_ = request.id
298
299         error = self.send(request)
300         reply = None
301         while not error:
302             error, reply = self.recv_block()
303             if reply and reply.type == Message.T_REPLY and reply.id == id_:
304                 break
305         return error, reply
306
307     def __process_msg(self):
308         json = self.parser.finish()
309         self.parser = None
310         if type(json) in [str, unicode]:
311             # XXX rate-limit
312             logging.warning("%s: error parsing stream: %s" % (self.name, json))
313             self.error(errno.EPROTO)
314             return
315
316         msg = Message.from_json(json)
317         if not isinstance(msg, Message):
318             # XXX rate-limit
319             logging.warning("%s: received bad JSON-RPC message: %s"
320                             % (self.name, msg))
321             self.error(errno.EPROTO)
322             return
323
324         self.__log_msg("received", msg)
325         return msg
326
327     def recv_wait(self, poller):
328         if self.status or self.input:
329             poller.immediate_wake()
330         else:
331             self.stream.recv_wait(poller)
332
333     def error(self, error):
334         if self.status == 0:
335             self.status = error
336             self.stream.close()
337             self.output = ""
338
339
340 class Session(object):
341     """A JSON-RPC session with reconnection."""
342
343     def __init__(self, reconnect, rpc):
344         self.reconnect = reconnect
345         self.rpc = rpc
346         self.stream = None
347         self.pstream = None
348         self.seqno = 0
349
350     @staticmethod
351     def open(name):
352         """Creates and returns a Session that maintains a JSON-RPC session to
353         'name', which should be a string acceptable to ovs.stream.Stream or
354         ovs.stream.PassiveStream's initializer.
355
356         If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
357         session connects and reconnects, with back-off, to 'name'.
358
359         If 'name' is a passive connection method, e.g. "ptcp:", the new session
360         listens for connections to 'name'.  It maintains at most one connection
361         at any given time.  Any new connection causes the previous one (if any)
362         to be dropped."""
363         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
364         reconnect.set_name(name)
365         reconnect.enable(ovs.timeval.msec())
366
367         if ovs.stream.PassiveStream.is_valid_name(name):
368             reconnect.set_passive(True, ovs.timeval.msec())
369
370         return Session(reconnect, None)
371
372     @staticmethod
373     def open_unreliably(jsonrpc):
374         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
375         reconnect.set_quiet(True)
376         reconnect.set_name(jsonrpc.name)
377         reconnect.set_max_tries(0)
378         reconnect.connected(ovs.timeval.msec())
379         return Session(reconnect, jsonrpc)
380
381     def close(self):
382         if self.rpc is not None:
383             self.rpc.close()
384             self.rpc = None
385         if self.stream is not None:
386             self.stream.close()
387             self.stream = None
388         if self.pstream is not None:
389             self.pstream.close()
390             self.pstream = None
391
392     def __disconnect(self):
393         if self.rpc is not None:
394             self.rpc.error(EOF)
395             self.rpc.close()
396             self.rpc = None
397             self.seqno += 1
398         elif self.stream is not None:
399             self.stream.close()
400             self.stream = None
401             self.seqno += 1
402
403     def __connect(self):
404         self.__disconnect()
405
406         name = self.reconnect.get_name()
407         if not self.reconnect.is_passive():
408             error, self.stream = ovs.stream.Stream.open(name)
409             if not error:
410                 self.reconnect.connecting(ovs.timeval.msec())
411             else:
412                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
413         elif self.pstream is not None:
414             error, self.pstream = ovs.stream.PassiveStream.open(name)
415             if not error:
416                 self.reconnect.listening(ovs.timeval.msec())
417             else:
418                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
419
420         self.seqno += 1
421
422     def run(self):
423         if self.pstream is not None:
424             error, stream = self.pstream.accept()
425             if error == 0:
426                 if self.rpc or self.stream:
427                     # XXX rate-limit
428                     logging.info("%s: new connection replacing active "
429                                  "connection" % self.reconnect.get_name())
430                     self.__disconnect()
431                 self.reconnect.connected(ovs.timeval.msec())
432                 self.rpc = Connection(stream)
433             elif error != errno.EAGAIN:
434                 self.reconnect.listen_error(ovs.timeval.msec(), error)
435                 self.pstream.close()
436                 self.pstream = None
437
438         if self.rpc:
439             self.rpc.run()
440             error = self.rpc.get_status()
441             if error != 0:
442                 self.reconnect.disconnected(ovs.timeval.msec(), error)
443                 self.__disconnect()
444         elif self.stream is not None:
445             self.stream.run()
446             error = self.stream.connect()
447             if error == 0:
448                 self.reconnect.connected(ovs.timeval.msec())
449                 self.rpc = Connection(self.stream)
450                 self.stream = None
451             elif error != errno.EAGAIN:
452                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
453                 self.stream.close()
454                 self.stream = None
455
456         action = self.reconnect.run(ovs.timeval.msec())
457         if action == ovs.reconnect.CONNECT:
458             self.__connect()
459         elif action == ovs.reconnect.DISCONNECT:
460             self.reconnect.disconnected(ovs.timeval.msec(), 0)
461             self.__disconnect()
462         elif action == ovs.reconnect.PROBE:
463             if self.rpc:
464                 request = Message.create_request("echo", [])
465                 request.id = "echo"
466                 self.rpc.send(request)
467         else:
468             assert action == None
469
470     def wait(self, poller):
471         if self.rpc is not None:
472             self.rpc.wait(poller)
473         elif self.stream is not None:
474             self.stream.run_wait(poller)
475             self.stream.connect_wait(poller)
476         if self.pstream is not None:
477             self.pstream.wait(poller)
478         self.reconnect.wait(poller, ovs.timeval.msec())
479
480     def get_backlog(self):
481         if self.rpc is not None:
482             return self.rpc.get_backlog()
483         else:
484             return 0
485
486     def get_name(self):
487         return self.reconnect.get_name()
488
489     def send(self, msg):
490         if self.rpc is not None:
491             return self.rpc.send(msg)
492         else:
493             return errno.ENOTCONN
494
495     def recv(self):
496         if self.rpc is not None:
497             error, msg = self.rpc.recv()
498             if not error:
499                 self.reconnect.received(ovs.timeval.msec())
500                 if msg.type == Message.T_REQUEST and msg.method == "echo":
501                     # Echo request.  Send reply.
502                     self.send(Message.create_reply(msg.params, msg.id))
503                 elif msg.type == Message.T_REPLY and msg.id == "echo":
504                     # It's a reply to our echo request.  Suppress it.
505                     pass
506                 else:
507                     return msg
508         return None
509
510     def recv_wait(self, poller):
511         if self.rpc is not None:
512             self.rpc.recv_wait(poller)
513
514     def is_alive(self):
515         if self.rpc is not None or self.stream is not None:
516             return True
517         else:
518             max_tries = self.reconnect.get_max_tries()
519             return max_tries is None or max_tries > 0
520
521     def is_connected(self):
522         return self.rpc is not None
523
524     def get_seqno(self):
525         return self.seqno
526
527     def force_reconnect(self):
528         self.reconnect.force_reconnect(ovs.timeval.msec())