Global replace of Nicira Networks.
[sliver-openvswitch.git] / python / ovs / jsonrpc.py
1 # Copyright (c) 2010, 2011, 2012 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17
18 import ovs.json
19 import ovs.poller
20 import ovs.reconnect
21 import ovs.stream
22 import ovs.timeval
23 import ovs.util
24 import ovs.vlog
25
26 EOF = ovs.util.EOF
27 vlog = ovs.vlog.Vlog("jsonrpc")
28
29
30 class Message(object):
31     T_REQUEST = 0               # Request.
32     T_NOTIFY = 1                # Notification.
33     T_REPLY = 2                 # Successful reply.
34     T_ERROR = 3                 # Error reply.
35
36     __types = {T_REQUEST: "request",
37                T_NOTIFY: "notification",
38                T_REPLY: "reply",
39                T_ERROR: "error"}
40
41     def __init__(self, type_, method, params, result, error, id):
42         self.type = type_
43         self.method = method
44         self.params = params
45         self.result = result
46         self.error = error
47         self.id = id
48
49     _next_id = 0
50
51     @staticmethod
52     def _create_id():
53         this_id = Message._next_id
54         Message._next_id += 1
55         return this_id
56
57     @staticmethod
58     def create_request(method, params):
59         return Message(Message.T_REQUEST, method, params, None, None,
60                        Message._create_id())
61
62     @staticmethod
63     def create_notify(method, params):
64         return Message(Message.T_NOTIFY, method, params, None, None,
65                        None)
66
67     @staticmethod
68     def create_reply(result, id):
69         return Message(Message.T_REPLY, None, None, result, None, id)
70
71     @staticmethod
72     def create_error(error, id):
73         return Message(Message.T_ERROR, None, None, None, error, id)
74
75     @staticmethod
76     def type_to_string(type_):
77         return Message.__types[type_]
78
79     def __validate_arg(self, value, name, must_have):
80         if (value is not None) == (must_have != 0):
81             return None
82         else:
83             type_name = Message.type_to_string(self.type)
84             if must_have:
85                 verb = "must"
86             else:
87                 verb = "must not"
88             return "%s %s have \"%s\"" % (type_name, verb, name)
89
90     def is_valid(self):
91         if self.params is not None and type(self.params) != list:
92             return "\"params\" must be JSON array"
93
94         pattern = {Message.T_REQUEST: 0x11001,
95                    Message.T_NOTIFY:  0x11000,
96                    Message.T_REPLY:   0x00101,
97                    Message.T_ERROR:   0x00011}.get(self.type)
98         if pattern is None:
99             return "invalid JSON-RPC message type %s" % self.type
100
101         return (
102             self.__validate_arg(self.method, "method", pattern & 0x10000) or
103             self.__validate_arg(self.params, "params", pattern & 0x1000) or
104             self.__validate_arg(self.result, "result", pattern & 0x100) or
105             self.__validate_arg(self.error, "error", pattern & 0x10) or
106             self.__validate_arg(self.id, "id", pattern & 0x1))
107
108     @staticmethod
109     def from_json(json):
110         if type(json) != dict:
111             return "message is not a JSON object"
112
113         # Make a copy to avoid modifying the caller's dict.
114         json = dict(json)
115
116         if "method" in json:
117             method = json.pop("method")
118             if type(method) not in [str, unicode]:
119                 return "method is not a JSON string"
120         else:
121             method = None
122
123         params = json.pop("params", None)
124         result = json.pop("result", None)
125         error = json.pop("error", None)
126         id_ = json.pop("id", None)
127         if len(json):
128             return "message has unexpected member \"%s\"" % json.popitem()[0]
129
130         if result is not None:
131             msg_type = Message.T_REPLY
132         elif error is not None:
133             msg_type = Message.T_ERROR
134         elif id_ is not None:
135             msg_type = Message.T_REQUEST
136         else:
137             msg_type = Message.T_NOTIFY
138
139         msg = Message(msg_type, method, params, result, error, id_)
140         validation_error = msg.is_valid()
141         if validation_error is not None:
142             return validation_error
143         else:
144             return msg
145
146     def to_json(self):
147         json = {}
148
149         if self.method is not None:
150             json["method"] = self.method
151
152         if self.params is not None:
153             json["params"] = self.params
154
155         if self.result is not None or self.type == Message.T_ERROR:
156             json["result"] = self.result
157
158         if self.error is not None or self.type == Message.T_REPLY:
159             json["error"] = self.error
160
161         if self.id is not None or self.type == Message.T_NOTIFY:
162             json["id"] = self.id
163
164         return json
165
166     def __str__(self):
167         s = [Message.type_to_string(self.type)]
168         if self.method is not None:
169             s.append("method=\"%s\"" % self.method)
170         if self.params is not None:
171             s.append("params=" + ovs.json.to_string(self.params))
172         if self.result is not None:
173             s.append("result=" + ovs.json.to_string(self.result))
174         if self.error is not None:
175             s.append("error=" + ovs.json.to_string(self.error))
176         if self.id is not None:
177             s.append("id=" + ovs.json.to_string(self.id))
178         return ", ".join(s)
179
180
181 class Connection(object):
182     def __init__(self, stream):
183         self.name = stream.name
184         self.stream = stream
185         self.status = 0
186         self.input = ""
187         self.output = ""
188         self.parser = None
189
190     def close(self):
191         self.stream.close()
192         self.stream = None
193
194     def run(self):
195         if self.status:
196             return
197
198         while len(self.output):
199             retval = self.stream.send(self.output)
200             if retval >= 0:
201                 self.output = self.output[retval:]
202             else:
203                 if retval != -errno.EAGAIN:
204                     vlog.warn("%s: send error: %s" %
205                               (self.name, os.strerror(-retval)))
206                     self.error(-retval)
207                 break
208
209     def wait(self, poller):
210         if not self.status:
211             self.stream.run_wait(poller)
212             if len(self.output):
213                 self.stream.send_wait()
214
215     def get_status(self):
216         return self.status
217
218     def get_backlog(self):
219         if self.status != 0:
220             return 0
221         else:
222             return len(self.output)
223
224     def __log_msg(self, title, msg):
225         vlog.dbg("%s: %s %s" % (self.name, title, msg))
226
227     def send(self, msg):
228         if self.status:
229             return self.status
230
231         self.__log_msg("send", msg)
232
233         was_empty = len(self.output) == 0
234         self.output += ovs.json.to_string(msg.to_json())
235         if was_empty:
236             self.run()
237         return self.status
238
239     def send_block(self, msg):
240         error = self.send(msg)
241         if error:
242             return error
243
244         while True:
245             self.run()
246             if not self.get_backlog() or self.get_status():
247                 return self.status
248
249             poller = ovs.poller.Poller()
250             self.wait(poller)
251             poller.block()
252
253     def recv(self):
254         if self.status:
255             return self.status, None
256
257         while True:
258             if not self.input:
259                 error, data = self.stream.recv(4096)
260                 if error:
261                     if error == errno.EAGAIN:
262                         return error, None
263                     else:
264                         # XXX rate-limit
265                         vlog.warn("%s: receive error: %s"
266                                   % (self.name, os.strerror(error)))
267                         self.error(error)
268                         return self.status, None
269                 elif not data:
270                     self.error(EOF)
271                     return EOF, None
272                 else:
273                     self.input += data
274             else:
275                 if self.parser is None:
276                     self.parser = ovs.json.Parser()
277                 self.input = self.input[self.parser.feed(self.input):]
278                 if self.parser.is_done():
279                     msg = self.__process_msg()
280                     if msg:
281                         return 0, msg
282                     else:
283                         return self.status, None
284
285     def recv_block(self):
286         while True:
287             error, msg = self.recv()
288             if error != errno.EAGAIN:
289                 return error, msg
290
291             self.run()
292
293             poller = ovs.poller.Poller()
294             self.wait(poller)
295             self.recv_wait(poller)
296             poller.block()
297
298     def transact_block(self, request):
299         id_ = request.id
300
301         error = self.send(request)
302         reply = None
303         while not error:
304             error, reply = self.recv_block()
305             if (reply
306                 and (reply.type == Message.T_REPLY
307                      or reply.type == Message.T_ERROR)
308                 and reply.id == id_):
309                 break
310         return error, reply
311
312     def __process_msg(self):
313         json = self.parser.finish()
314         self.parser = None
315         if type(json) in [str, unicode]:
316             # XXX rate-limit
317             vlog.warn("%s: error parsing stream: %s" % (self.name, json))
318             self.error(errno.EPROTO)
319             return
320
321         msg = Message.from_json(json)
322         if not isinstance(msg, Message):
323             # XXX rate-limit
324             vlog.warn("%s: received bad JSON-RPC message: %s"
325                       % (self.name, msg))
326             self.error(errno.EPROTO)
327             return
328
329         self.__log_msg("received", msg)
330         return msg
331
332     def recv_wait(self, poller):
333         if self.status or self.input:
334             poller.immediate_wake()
335         else:
336             self.stream.recv_wait(poller)
337
338     def error(self, error):
339         if self.status == 0:
340             self.status = error
341             self.stream.close()
342             self.output = ""
343
344
345 class Session(object):
346     """A JSON-RPC session with reconnection."""
347
348     def __init__(self, reconnect, rpc):
349         self.reconnect = reconnect
350         self.rpc = rpc
351         self.stream = None
352         self.pstream = None
353         self.seqno = 0
354
355     @staticmethod
356     def open(name):
357         """Creates and returns a Session that maintains a JSON-RPC session to
358         'name', which should be a string acceptable to ovs.stream.Stream or
359         ovs.stream.PassiveStream's initializer.
360
361         If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
362         session connects and reconnects, with back-off, to 'name'.
363
364         If 'name' is a passive connection method, e.g. "ptcp:", the new session
365         listens for connections to 'name'.  It maintains at most one connection
366         at any given time.  Any new connection causes the previous one (if any)
367         to be dropped."""
368         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
369         reconnect.set_name(name)
370         reconnect.enable(ovs.timeval.msec())
371
372         if ovs.stream.PassiveStream.is_valid_name(name):
373             reconnect.set_passive(True, ovs.timeval.msec())
374
375         if ovs.stream.stream_or_pstream_needs_probes(name):
376             reconnect.set_probe_interval(0)
377
378         return Session(reconnect, None)
379
380     @staticmethod
381     def open_unreliably(jsonrpc):
382         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
383         reconnect.set_quiet(True)
384         reconnect.set_name(jsonrpc.name)
385         reconnect.set_max_tries(0)
386         reconnect.connected(ovs.timeval.msec())
387         return Session(reconnect, jsonrpc)
388
389     def close(self):
390         if self.rpc is not None:
391             self.rpc.close()
392             self.rpc = None
393         if self.stream is not None:
394             self.stream.close()
395             self.stream = None
396         if self.pstream is not None:
397             self.pstream.close()
398             self.pstream = None
399
400     def __disconnect(self):
401         if self.rpc is not None:
402             self.rpc.error(EOF)
403             self.rpc.close()
404             self.rpc = None
405             self.seqno += 1
406         elif self.stream is not None:
407             self.stream.close()
408             self.stream = None
409             self.seqno += 1
410
411     def __connect(self):
412         self.__disconnect()
413
414         name = self.reconnect.get_name()
415         if not self.reconnect.is_passive():
416             error, self.stream = ovs.stream.Stream.open(name)
417             if not error:
418                 self.reconnect.connecting(ovs.timeval.msec())
419             else:
420                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
421         elif self.pstream is not None:
422             error, self.pstream = ovs.stream.PassiveStream.open(name)
423             if not error:
424                 self.reconnect.listening(ovs.timeval.msec())
425             else:
426                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
427
428         self.seqno += 1
429
430     def run(self):
431         if self.pstream is not None:
432             error, stream = self.pstream.accept()
433             if error == 0:
434                 if self.rpc or self.stream:
435                     # XXX rate-limit
436                     vlog.info("%s: new connection replacing active "
437                               "connection" % self.reconnect.get_name())
438                     self.__disconnect()
439                 self.reconnect.connected(ovs.timeval.msec())
440                 self.rpc = Connection(stream)
441             elif error != errno.EAGAIN:
442                 self.reconnect.listen_error(ovs.timeval.msec(), error)
443                 self.pstream.close()
444                 self.pstream = None
445
446         if self.rpc:
447             self.rpc.run()
448             error = self.rpc.get_status()
449             if error != 0:
450                 self.reconnect.disconnected(ovs.timeval.msec(), error)
451                 self.__disconnect()
452         elif self.stream is not None:
453             self.stream.run()
454             error = self.stream.connect()
455             if error == 0:
456                 self.reconnect.connected(ovs.timeval.msec())
457                 self.rpc = Connection(self.stream)
458                 self.stream = None
459             elif error != errno.EAGAIN:
460                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
461                 self.stream.close()
462                 self.stream = None
463
464         action = self.reconnect.run(ovs.timeval.msec())
465         if action == ovs.reconnect.CONNECT:
466             self.__connect()
467         elif action == ovs.reconnect.DISCONNECT:
468             self.reconnect.disconnected(ovs.timeval.msec(), 0)
469             self.__disconnect()
470         elif action == ovs.reconnect.PROBE:
471             if self.rpc:
472                 request = Message.create_request("echo", [])
473                 request.id = "echo"
474                 self.rpc.send(request)
475         else:
476             assert action == None
477
478     def wait(self, poller):
479         if self.rpc is not None:
480             self.rpc.wait(poller)
481         elif self.stream is not None:
482             self.stream.run_wait(poller)
483             self.stream.connect_wait(poller)
484         if self.pstream is not None:
485             self.pstream.wait(poller)
486         self.reconnect.wait(poller, ovs.timeval.msec())
487
488     def get_backlog(self):
489         if self.rpc is not None:
490             return self.rpc.get_backlog()
491         else:
492             return 0
493
494     def get_name(self):
495         return self.reconnect.get_name()
496
497     def send(self, msg):
498         if self.rpc is not None:
499             return self.rpc.send(msg)
500         else:
501             return errno.ENOTCONN
502
503     def recv(self):
504         if self.rpc is not None:
505             error, msg = self.rpc.recv()
506             if not error:
507                 self.reconnect.received(ovs.timeval.msec())
508                 if msg.type == Message.T_REQUEST and msg.method == "echo":
509                     # Echo request.  Send reply.
510                     self.send(Message.create_reply(msg.params, msg.id))
511                 elif msg.type == Message.T_REPLY and msg.id == "echo":
512                     # It's a reply to our echo request.  Suppress it.
513                     pass
514                 else:
515                     return msg
516         return None
517
518     def recv_wait(self, poller):
519         if self.rpc is not None:
520             self.rpc.recv_wait(poller)
521
522     def is_alive(self):
523         if self.rpc is not None or self.stream is not None:
524             return True
525         else:
526             max_tries = self.reconnect.get_max_tries()
527             return max_tries is None or max_tries > 0
528
529     def is_connected(self):
530         return self.rpc is not None
531
532     def get_seqno(self):
533         return self.seqno
534
535     def force_reconnect(self):
536         self.reconnect.force_reconnect(ovs.timeval.msec())