python: Add ovs_error() helper function to Python.
[sliver-openvswitch.git] / python / ovs / jsonrpc.py
1 # Copyright (c) 2010, 2011, 2012 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import os
17
18 import ovs.json
19 import ovs.poller
20 import ovs.reconnect
21 import ovs.stream
22 import ovs.timeval
23 import ovs.util
24 import ovs.vlog
25
26 EOF = ovs.util.EOF
27 vlog = ovs.vlog.Vlog("jsonrpc")
28
29
30 class Message(object):
31     T_REQUEST = 0               # Request.
32     T_NOTIFY = 1                # Notification.
33     T_REPLY = 2                 # Successful reply.
34     T_ERROR = 3                 # Error reply.
35
36     __types = {T_REQUEST: "request",
37                T_NOTIFY: "notification",
38                T_REPLY: "reply",
39                T_ERROR: "error"}
40
41     def __init__(self, type_, method, params, result, error, id):
42         self.type = type_
43         self.method = method
44         self.params = params
45         self.result = result
46         self.error = error
47         self.id = id
48
49     _next_id = 0
50
51     @staticmethod
52     def _create_id():
53         this_id = Message._next_id
54         Message._next_id += 1
55         return this_id
56
57     @staticmethod
58     def create_request(method, params):
59         return Message(Message.T_REQUEST, method, params, None, None,
60                        Message._create_id())
61
62     @staticmethod
63     def create_notify(method, params):
64         return Message(Message.T_NOTIFY, method, params, None, None,
65                        None)
66
67     @staticmethod
68     def create_reply(result, id):
69         return Message(Message.T_REPLY, None, None, result, None, id)
70
71     @staticmethod
72     def create_error(error, id):
73         return Message(Message.T_ERROR, None, None, None, error, id)
74
75     @staticmethod
76     def type_to_string(type_):
77         return Message.__types[type_]
78
79     def __validate_arg(self, value, name, must_have):
80         if (value is not None) == (must_have != 0):
81             return None
82         else:
83             type_name = Message.type_to_string(self.type)
84             if must_have:
85                 verb = "must"
86             else:
87                 verb = "must not"
88             return "%s %s have \"%s\"" % (type_name, verb, name)
89
90     def is_valid(self):
91         if self.params is not None and type(self.params) != list:
92             return "\"params\" must be JSON array"
93
94         pattern = {Message.T_REQUEST: 0x11001,
95                    Message.T_NOTIFY:  0x11000,
96                    Message.T_REPLY:   0x00101,
97                    Message.T_ERROR:   0x00011}.get(self.type)
98         if pattern is None:
99             return "invalid JSON-RPC message type %s" % self.type
100
101         return (
102             self.__validate_arg(self.method, "method", pattern & 0x10000) or
103             self.__validate_arg(self.params, "params", pattern & 0x1000) or
104             self.__validate_arg(self.result, "result", pattern & 0x100) or
105             self.__validate_arg(self.error, "error", pattern & 0x10) or
106             self.__validate_arg(self.id, "id", pattern & 0x1))
107
108     @staticmethod
109     def from_json(json):
110         if type(json) != dict:
111             return "message is not a JSON object"
112
113         # Make a copy to avoid modifying the caller's dict.
114         json = dict(json)
115
116         if "method" in json:
117             method = json.pop("method")
118             if type(method) not in [str, unicode]:
119                 return "method is not a JSON string"
120         else:
121             method = None
122
123         params = json.pop("params", None)
124         result = json.pop("result", None)
125         error = json.pop("error", None)
126         id_ = json.pop("id", None)
127         if len(json):
128             return "message has unexpected member \"%s\"" % json.popitem()[0]
129
130         if result is not None:
131             msg_type = Message.T_REPLY
132         elif error is not None:
133             msg_type = Message.T_ERROR
134         elif id_ is not None:
135             msg_type = Message.T_REQUEST
136         else:
137             msg_type = Message.T_NOTIFY
138
139         msg = Message(msg_type, method, params, result, error, id_)
140         validation_error = msg.is_valid()
141         if validation_error is not None:
142             return validation_error
143         else:
144             return msg
145
146     def to_json(self):
147         json = {}
148
149         if self.method is not None:
150             json["method"] = self.method
151
152         if self.params is not None:
153             json["params"] = self.params
154
155         if self.result is not None or self.type == Message.T_ERROR:
156             json["result"] = self.result
157
158         if self.error is not None or self.type == Message.T_REPLY:
159             json["error"] = self.error
160
161         if self.id is not None or self.type == Message.T_NOTIFY:
162             json["id"] = self.id
163
164         return json
165
166     def __str__(self):
167         s = [Message.type_to_string(self.type)]
168         if self.method is not None:
169             s.append("method=\"%s\"" % self.method)
170         if self.params is not None:
171             s.append("params=" + ovs.json.to_string(self.params))
172         if self.result is not None:
173             s.append("result=" + ovs.json.to_string(self.result))
174         if self.error is not None:
175             s.append("error=" + ovs.json.to_string(self.error))
176         if self.id is not None:
177             s.append("id=" + ovs.json.to_string(self.id))
178         return ", ".join(s)
179
180
181 class Connection(object):
182     def __init__(self, stream):
183         self.name = stream.name
184         self.stream = stream
185         self.status = 0
186         self.input = ""
187         self.output = ""
188         self.parser = None
189
190     def close(self):
191         self.stream.close()
192         self.stream = None
193
194     def run(self):
195         if self.status:
196             return
197
198         while len(self.output):
199             retval = self.stream.send(self.output)
200             if retval >= 0:
201                 self.output = self.output[retval:]
202             else:
203                 if retval != -errno.EAGAIN:
204                     vlog.warn("%s: send error: %s" %
205                               (self.name, os.strerror(-retval)))
206                     self.error(-retval)
207                 break
208
209     def wait(self, poller):
210         if not self.status:
211             self.stream.run_wait(poller)
212             if len(self.output):
213                 self.stream.send_wait()
214
215     def get_status(self):
216         return self.status
217
218     def get_backlog(self):
219         if self.status != 0:
220             return 0
221         else:
222             return len(self.output)
223
224     def __log_msg(self, title, msg):
225         vlog.dbg("%s: %s %s" % (self.name, title, msg))
226
227     def send(self, msg):
228         if self.status:
229             return self.status
230
231         self.__log_msg("send", msg)
232
233         was_empty = len(self.output) == 0
234         self.output += ovs.json.to_string(msg.to_json())
235         if was_empty:
236             self.run()
237         return self.status
238
239     def send_block(self, msg):
240         error = self.send(msg)
241         if error:
242             return error
243
244         while True:
245             self.run()
246             if not self.get_backlog() or self.get_status():
247                 return self.status
248
249             poller = ovs.poller.Poller()
250             self.wait(poller)
251             poller.block()
252
253     def recv(self):
254         if self.status:
255             return self.status, None
256
257         while True:
258             if not self.input:
259                 error, data = self.stream.recv(4096)
260                 if error:
261                     if error == errno.EAGAIN:
262                         return error, None
263                     else:
264                         # XXX rate-limit
265                         vlog.warn("%s: receive error: %s"
266                                   % (self.name, os.strerror(error)))
267                         self.error(error)
268                         return self.status, None
269                 elif not data:
270                     self.error(EOF)
271                     return EOF, None
272                 else:
273                     self.input += data
274             else:
275                 if self.parser is None:
276                     self.parser = ovs.json.Parser()
277                 self.input = self.input[self.parser.feed(self.input):]
278                 if self.parser.is_done():
279                     msg = self.__process_msg()
280                     if msg:
281                         return 0, msg
282                     else:
283                         return self.status, None
284
285     def recv_block(self):
286         while True:
287             error, msg = self.recv()
288             if error != errno.EAGAIN:
289                 return error, msg
290
291             self.run()
292
293             poller = ovs.poller.Poller()
294             self.wait(poller)
295             self.recv_wait(poller)
296             poller.block()
297
298     def transact_block(self, request):
299         id_ = request.id
300
301         error = self.send(request)
302         reply = None
303         while not error:
304             error, reply = self.recv_block()
305             if (reply
306                 and (reply.type == Message.T_REPLY
307                      or reply.type == Message.T_ERROR)
308                 and reply.id == id_):
309                 break
310         return error, reply
311
312     def __process_msg(self):
313         json = self.parser.finish()
314         self.parser = None
315         if type(json) in [str, unicode]:
316             # XXX rate-limit
317             vlog.warn("%s: error parsing stream: %s" % (self.name, json))
318             self.error(errno.EPROTO)
319             return
320
321         msg = Message.from_json(json)
322         if not isinstance(msg, Message):
323             # XXX rate-limit
324             vlog.warn("%s: received bad JSON-RPC message: %s"
325                       % (self.name, msg))
326             self.error(errno.EPROTO)
327             return
328
329         self.__log_msg("received", msg)
330         return msg
331
332     def recv_wait(self, poller):
333         if self.status or self.input:
334             poller.immediate_wake()
335         else:
336             self.stream.recv_wait(poller)
337
338     def error(self, error):
339         if self.status == 0:
340             self.status = error
341             self.stream.close()
342             self.output = ""
343
344
345 class Session(object):
346     """A JSON-RPC session with reconnection."""
347
348     def __init__(self, reconnect, rpc):
349         self.reconnect = reconnect
350         self.rpc = rpc
351         self.stream = None
352         self.pstream = None
353         self.seqno = 0
354
355     @staticmethod
356     def open(name):
357         """Creates and returns a Session that maintains a JSON-RPC session to
358         'name', which should be a string acceptable to ovs.stream.Stream or
359         ovs.stream.PassiveStream's initializer.
360
361         If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
362         session connects and reconnects, with back-off, to 'name'.
363
364         If 'name' is a passive connection method, e.g. "ptcp:", the new session
365         listens for connections to 'name'.  It maintains at most one connection
366         at any given time.  Any new connection causes the previous one (if any)
367         to be dropped."""
368         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
369         reconnect.set_name(name)
370         reconnect.enable(ovs.timeval.msec())
371
372         if ovs.stream.PassiveStream.is_valid_name(name):
373             reconnect.set_passive(True, ovs.timeval.msec())
374
375         return Session(reconnect, None)
376
377     @staticmethod
378     def open_unreliably(jsonrpc):
379         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
380         reconnect.set_quiet(True)
381         reconnect.set_name(jsonrpc.name)
382         reconnect.set_max_tries(0)
383         reconnect.connected(ovs.timeval.msec())
384         return Session(reconnect, jsonrpc)
385
386     def close(self):
387         if self.rpc is not None:
388             self.rpc.close()
389             self.rpc = None
390         if self.stream is not None:
391             self.stream.close()
392             self.stream = None
393         if self.pstream is not None:
394             self.pstream.close()
395             self.pstream = None
396
397     def __disconnect(self):
398         if self.rpc is not None:
399             self.rpc.error(EOF)
400             self.rpc.close()
401             self.rpc = None
402             self.seqno += 1
403         elif self.stream is not None:
404             self.stream.close()
405             self.stream = None
406             self.seqno += 1
407
408     def __connect(self):
409         self.__disconnect()
410
411         name = self.reconnect.get_name()
412         if not self.reconnect.is_passive():
413             error, self.stream = ovs.stream.Stream.open(name)
414             if not error:
415                 self.reconnect.connecting(ovs.timeval.msec())
416             else:
417                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
418         elif self.pstream is not None:
419             error, self.pstream = ovs.stream.PassiveStream.open(name)
420             if not error:
421                 self.reconnect.listening(ovs.timeval.msec())
422             else:
423                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
424
425         self.seqno += 1
426
427     def run(self):
428         if self.pstream is not None:
429             error, stream = self.pstream.accept()
430             if error == 0:
431                 if self.rpc or self.stream:
432                     # XXX rate-limit
433                     vlog.info("%s: new connection replacing active "
434                               "connection" % self.reconnect.get_name())
435                     self.__disconnect()
436                 self.reconnect.connected(ovs.timeval.msec())
437                 self.rpc = Connection(stream)
438             elif error != errno.EAGAIN:
439                 self.reconnect.listen_error(ovs.timeval.msec(), error)
440                 self.pstream.close()
441                 self.pstream = None
442
443         if self.rpc:
444             self.rpc.run()
445             error = self.rpc.get_status()
446             if error != 0:
447                 self.reconnect.disconnected(ovs.timeval.msec(), error)
448                 self.__disconnect()
449         elif self.stream is not None:
450             self.stream.run()
451             error = self.stream.connect()
452             if error == 0:
453                 self.reconnect.connected(ovs.timeval.msec())
454                 self.rpc = Connection(self.stream)
455                 self.stream = None
456             elif error != errno.EAGAIN:
457                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
458                 self.stream.close()
459                 self.stream = None
460
461         action = self.reconnect.run(ovs.timeval.msec())
462         if action == ovs.reconnect.CONNECT:
463             self.__connect()
464         elif action == ovs.reconnect.DISCONNECT:
465             self.reconnect.disconnected(ovs.timeval.msec(), 0)
466             self.__disconnect()
467         elif action == ovs.reconnect.PROBE:
468             if self.rpc:
469                 request = Message.create_request("echo", [])
470                 request.id = "echo"
471                 self.rpc.send(request)
472         else:
473             assert action == None
474
475     def wait(self, poller):
476         if self.rpc is not None:
477             self.rpc.wait(poller)
478         elif self.stream is not None:
479             self.stream.run_wait(poller)
480             self.stream.connect_wait(poller)
481         if self.pstream is not None:
482             self.pstream.wait(poller)
483         self.reconnect.wait(poller, ovs.timeval.msec())
484
485     def get_backlog(self):
486         if self.rpc is not None:
487             return self.rpc.get_backlog()
488         else:
489             return 0
490
491     def get_name(self):
492         return self.reconnect.get_name()
493
494     def send(self, msg):
495         if self.rpc is not None:
496             return self.rpc.send(msg)
497         else:
498             return errno.ENOTCONN
499
500     def recv(self):
501         if self.rpc is not None:
502             error, msg = self.rpc.recv()
503             if not error:
504                 self.reconnect.received(ovs.timeval.msec())
505                 if msg.type == Message.T_REQUEST and msg.method == "echo":
506                     # Echo request.  Send reply.
507                     self.send(Message.create_reply(msg.params, msg.id))
508                 elif msg.type == Message.T_REPLY and msg.id == "echo":
509                     # It's a reply to our echo request.  Suppress it.
510                     pass
511                 else:
512                     return msg
513         return None
514
515     def recv_wait(self, poller):
516         if self.rpc is not None:
517             self.rpc.recv_wait(poller)
518
519     def is_alive(self):
520         if self.rpc is not None or self.stream is not None:
521             return True
522         else:
523             max_tries = self.reconnect.get_max_tries()
524             return max_tries is None or max_tries > 0
525
526     def is_connected(self):
527         return self.rpc is not None
528
529     def get_seqno(self):
530         return self.seqno
531
532     def force_reconnect(self):
533         self.reconnect.force_reconnect(ovs.timeval.msec())