python: Avoid using 'type' as a variable name.
[sliver-openvswitch.git] / python / ovs / jsonrpc.py
1 # Copyright (c) 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import logging
17 import os
18
19 import ovs.json
20 import ovs.poller
21 import ovs.reconnect
22 import ovs.stream
23 import ovs.timeval
24
25 EOF = -1
26
27 class Message(object):
28     T_REQUEST = 0               # Request.
29     T_NOTIFY = 1                # Notification.
30     T_REPLY = 2                 # Successful reply.
31     T_ERROR = 3                 # Error reply.
32
33     __types = {T_REQUEST: "request",
34                T_NOTIFY: "notification",
35                T_REPLY: "reply",
36                T_ERROR: "error"}
37     __next_id = 0
38
39     def __init__(self, type_, method, params, result, error, id):
40         self.type = type_
41         self.method = method
42         self.params = params
43         self.result = result
44         self.error = error
45         self.id = id
46
47     _next_id = 0
48     @staticmethod
49     def _create_id():
50         this_id = Message._next_id
51         Message._next_id += 1
52         return this_id
53
54     @staticmethod
55     def create_request(method, params):
56         return Message(Message.T_REQUEST, method, params, None, None,
57                        Message._create_id())
58
59     @staticmethod
60     def create_notify(method, params):
61         return Message(Message.T_NOTIFY, method, params, None, None,
62                        None)
63
64     @staticmethod
65     def create_reply(result, id):
66         return Message(Message.T_REPLY, None, None, result, None, id)
67
68     @staticmethod
69     def create_error(error, id):
70         return Message(Message.T_ERROR, None, None, None, error, id)
71
72     @staticmethod
73     def type_to_string(type_):
74         return Message.__types[type_]
75
76     @staticmethod
77     def __validate_arg(value, name, must_have):
78         if (value is not None) == (must_have != 0):
79             return None
80         else:
81             type_name = Message.type_to_string(self.type)
82             if must_have:
83                 verb = "must"
84             else:
85                 verb = "must not"
86             return "%s %s have \"%s\"" % (type_name, verb, name)
87
88     def is_valid(self):
89         if self.params is not None and type(self.params) != list:
90             return "\"params\" must be JSON array"
91
92         pattern = {Message.T_REQUEST: 0x11001,
93                    Message.T_NOTIFY:  0x11000,
94                    Message.T_REPLY:   0x00101,
95                    Message.T_ERROR:   0x00011}.get(self.type)
96         if pattern is None:
97             return "invalid JSON-RPC message type %s" % self.type
98
99         return (
100             Message.__validate_arg(self.method, "method", pattern & 0x10000) or
101             Message.__validate_arg(self.params, "params", pattern & 0x1000) or
102             Message.__validate_arg(self.result, "result", pattern & 0x100) or
103             Message.__validate_arg(self.error, "error", pattern & 0x10) or
104             Message.__validate_arg(self.id, "id", pattern & 0x1))
105
106     @staticmethod
107     def from_json(json):
108         if type(json) != dict:
109             return "message is not a JSON object"
110
111         # Make a copy to avoid modifying the caller's dict.
112         json = dict(json)
113
114         if "method" in json:
115             method = json.pop("method")
116             if type(method) not in [str, unicode]:
117                 return "method is not a JSON string"
118         else:
119             method = None
120
121         params = json.pop("params", None)
122         result = json.pop("result", None)
123         error = json.pop("error", None)
124         id = json.pop("id", None)
125         if len(json):
126             return "message has unexpected member \"%s\"" % json.popitem()[0]
127
128         if result is not None:
129             msg_type = Message.T_REPLY
130         elif error is not None:
131             msg_type = Message.T_ERROR
132         elif id is not None:
133             msg_type = Message.T_REQUEST
134         else:
135             msg_type = Message.T_NOTIFY
136         
137         msg = Message(msg_type, method, params, result, error, id)
138         validation_error = msg.is_valid()
139         if validation_error is not None:
140             return validation_error
141         else:
142             return msg
143
144     def to_json(self):
145         json = {}
146
147         if self.method is not None:
148             json["method"] = self.method
149
150         if self.params is not None:
151             json["params"] = self.params
152
153         if self.result is not None or self.type == Message.T_ERROR:
154             json["result"] = self.result
155
156         if self.error is not None or self.type == Message.T_REPLY:
157             json["error"] = self.error
158
159         if self.id is not None or self.type == Message.T_NOTIFY:
160             json["id"] = self.id
161
162         return json
163
164     def __str__(self):
165         s = [Message.type_to_string(self.type)]
166         if self.method is not None:
167             s.append("method=\"%s\"" % self.method)
168         if self.params is not None:
169             s.append("params=" + ovs.json.to_string(self.params))
170         if self.error is not None:
171             s.append("error=" + ovs.json.to_string(self.error))
172         if self.id is not None:
173             s.append("id=" + ovs.json.to_string(self.id))
174         return ", ".join(s)
175
176 class Connection(object):
177     def __init__(self, stream):
178         self.name = stream.get_name()
179         self.stream = stream
180         self.status = 0
181         self.input = ""
182         self.output = ""
183         self.parser = None
184
185     def close(self):
186         self.stream.close()
187         self.stream = None
188
189     def run(self):
190         if self.status:
191             return
192
193         while len(self.output):
194             retval = self.stream.send(self.output)
195             if retval >= 0:
196                 self.output = self.output[retval:]
197             else:
198                 if retval != -errno.EAGAIN:
199                     logging.warn("%s: send error: %s" % (self.name,
200                                                          os.strerror(-retval)))
201                     self.error(-retval)
202                 break
203
204     def wait(self, poller):
205         if not self.status:
206             self.stream.run_wait(poller)
207             if len(self.output):
208                 self.stream.send_wait()
209
210     def get_status(self):
211         return self.status
212
213     def get_backlog(self):
214         if self.status != 0:
215             return 0
216         else:
217             return len(self.output)
218
219     def get_name(self):
220         return self.name
221
222     def __log_msg(self, title, msg):
223         logging.debug("%s: %s %s" % (self.name, title, msg))
224
225     def send(self, msg):
226         if self.status:
227             return self.status
228
229         self.__log_msg("send", msg)
230
231         was_empty = len(self.output) == 0
232         self.output += ovs.json.to_string(msg.to_json())
233         if was_empty:
234             self.run()
235         return self.status
236
237     def send_block(self, msg):
238         error = self.send(msg)
239         if error:
240             return error
241
242         while True:
243             self.run()
244             if not self.get_backlog() or self.get_status():
245                 return self.status
246
247             poller = ovs.poller.Poller()
248             self.wait(poller)
249             poller.block()
250
251     def recv(self):
252         if self.status:
253             return self.status, None
254
255         while True:
256             if len(self.input) == 0:
257                 error, data = self.stream.recv(4096)
258                 if error:
259                     if error == errno.EAGAIN:
260                         return error, None
261                     else:
262                         # XXX rate-limit
263                         logging.warning("%s: receive error: %s"
264                                         % (self.name, os.strerror(error)))
265                         self.error(error)
266                         return self.status, None
267                 elif len(data) == 0:
268                     self.error(EOF)
269                     return EOF, None
270                 else:
271                     self.input += data
272             else:
273                 if self.parser is None:
274                     self.parser = ovs.json.Parser()
275                 self.input = self.input[self.parser.feed(self.input):]
276                 if self.parser.is_done():
277                     msg = self.__process_msg()
278                     if msg:
279                         return 0, msg
280                     else:
281                         return self.status, None
282
283     def recv_block(self):
284         while True:
285             error, msg = self.recv()
286             if error != errno.EAGAIN:
287                 return error, msg
288
289             self.run()
290
291             poller = ovs.poller.Poller()
292             self.wait(poller)
293             self.recv_wait(poller)
294             poller.block()
295     
296     def transact_block(self, request):
297         id = request.id
298
299         error = self.send(request)
300         reply = None
301         while not error:
302             error, reply = self.recv_block()
303             if reply and reply.type == Message.T_REPLY and reply.id == id:
304                 break
305         return error, reply
306
307     def __process_msg(self):
308         json = self.parser.finish()
309         self.parser = None
310         if type(json) in [str, unicode]:
311             # XXX rate-limit
312             logging.warning("%s: error parsing stream: %s" % (self.name, json))
313             self.error(errno.EPROTO)
314             return
315
316         msg = Message.from_json(json)
317         if not isinstance(msg, Message):
318             # XXX rate-limit
319             logging.warning("%s: received bad JSON-RPC message: %s"
320                             % (self.name, msg))
321             self.error(errno.EPROTO)
322             return
323
324         self.__log_msg("received", msg)
325         return msg
326         
327     def recv_wait(self, poller):
328         if self.status or len(self.input) > 0:
329             poller.immediate_wake()
330         else:
331             self.stream.recv_wait(poller)
332
333     def error(self, error):
334         if self.status == 0:
335             self.status = error
336             self.stream.close()
337             self.output = ""
338             
339 class Session(object):
340     """A JSON-RPC session with reconnection."""
341
342     def __init__(self, reconnect, rpc):
343         self.reconnect = reconnect
344         self.rpc = rpc
345         self.stream = None
346         self.pstream = None
347         self.seqno = 0
348
349     @staticmethod
350     def open(name):
351         """Creates and returns a Session that maintains a JSON-RPC session to
352         'name', which should be a string acceptable to ovs.stream.Stream or
353         ovs.stream.PassiveStream's initializer.
354         
355         If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
356         session connects and reconnects, with back-off, to 'name'.
357         
358         If 'name' is a passive connection method, e.g. "ptcp:", the new session
359         listens for connections to 'name'.  It maintains at most one connection
360         at any given time.  Any new connection causes the previous one (if any)
361         to be dropped."""
362         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
363         reconnect.set_name(name)
364         reconnect.enable(ovs.timeval.msec())
365
366         if ovs.stream.PassiveStream.is_valid_name(name):
367             self.reconnect.set_passive(True, ovs.timeval.msec())
368
369         return Session(reconnect, None)
370
371     @staticmethod
372     def open_unreliably(jsonrpc):
373         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
374         reconnect.set_quiet(True)
375         reconnect.set_name(jsonrpc.get_name())
376         reconnect.set_max_tries(0)
377         reconnect.connected(ovs.timeval.msec())
378         return Session(reconnect, jsonrpc)
379
380     def close(self):
381         if self.rpc is not None:
382             self.rpc.close()
383             self.rpc = None
384         if self.stream is not None:
385             self.stream.close()
386             self.stream = None
387         if self.pstream is not None:
388             self.pstream.close()
389             self.pstream = None
390
391     def __disconnect(self):
392         if self.rpc is not None:
393             self.rpc.error(EOF)
394             self.rpc.close()
395             self.rpc = None
396             self.seqno += 1
397         elif self.stream is not None:
398             self.stream.close()
399             self.stream = None
400             self.seqno += 1
401     
402     def __connect(self):
403         self.__disconnect()
404
405         name = self.reconnect.get_name()
406         if not self.reconnect.is_passive():
407             error, self.stream = ovs.stream.Stream.open(name)
408             if not error:
409                 self.reconnect.connecting(ovs.timeval.msec())
410             else:
411                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
412         elif self.pstream is not None:
413             error, self.pstream = ovs.stream.PassiveStream.open(name)
414             if not error:
415                 self.reconnect.listening(ovs.timeval.msec())
416             else:
417                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
418
419         self.seqno += 1
420
421     def run(self):
422         if self.pstream is not None:
423             error, stream = self.pstream.accept()
424             if error == 0:
425                 if self.rpc or self.stream:
426                     # XXX rate-limit
427                     logging.info("%s: new connection replacing active "
428                                  "connection" % self.reconnect.get_name())
429                     self.__disconnect()
430                 self.reconnect.connected(ovs.timeval.msec())
431                 self.rpc = Connection(stream)
432             elif error != errno.EAGAIN:
433                 self.reconnect.listen_error(ovs.timeval.msec(), error)
434                 self.pstream.close()
435                 self.pstream = None
436
437         if self.rpc:
438             self.rpc.run()
439             error = self.rpc.get_status()
440             if error != 0:
441                 self.reconnect.disconnected(ovs.timeval.msec(), error)
442                 self.__disconnect()
443         elif self.stream is not None:
444             self.stream.run()
445             error = self.stream.connect()
446             if error == 0:
447                 self.reconnect.connected(ovs.timeval.msec())
448                 self.rpc = Connection(self.stream)
449                 self.stream = None
450             elif error != errno.EAGAIN:
451                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
452                 self.stream.close()
453                 self.stream = None
454
455         action = self.reconnect.run(ovs.timeval.msec())
456         if action == ovs.reconnect.CONNECT:
457             self.__connect()
458         elif action == ovs.reconnect.DISCONNECT:
459             self.reconnect.disconnected(ovs.timeval.msec(), 0)
460             self.__disconnect()
461         elif action == ovs.reconnect.PROBE:
462             if self.rpc:
463                 request = Message.create_request("echo", [])
464                 request.id = "echo"
465                 self.rpc.send(request)
466         else:
467             assert action == None
468
469     def wait(self, poller):
470         if self.rpc is not None:
471             self.rpc.wait(poller)
472         elif self.stream is not None:
473             self.stream.run_wait(poller)
474             self.stream.connect_wait(poller)
475         if self.pstream is not None:
476             self.pstream.wait(poller)
477         self.reconnect.wait(poller, ovs.timeval.msec())
478
479     def get_backlog(self):
480         if self.rpc is not None:
481             return self.rpc.get_backlog()
482         else:
483             return 0
484
485     def get_name(self):
486         return self.reconnect.get_name()
487
488     def send(self, msg):
489         if self.rpc is not None:
490             return self.rpc.send(msg)
491         else:
492             return errno.ENOTCONN
493
494     def recv(self):
495         if self.rpc is not None:
496             error, msg = self.rpc.recv()
497             if not error:
498                 self.reconnect.received(ovs.timeval.msec())
499                 if msg.type == Message.T_REQUEST and msg.method == "echo":
500                     # Echo request.  Send reply.
501                     self.send(Message.create_reply(msg.params, msg.id))
502                 elif msg.type == Message.T_REPLY and msg.id == "echo":
503                     # It's a reply to our echo request.  Suppress it.
504                     pass
505                 else:
506                     return msg
507         return None
508
509     def recv_wait(self, poller):
510         if self.rpc is not None:
511             self.rpc.recv_wait(poller)
512
513     def is_alive(self):
514         if self.rpc is not None or self.stream is not None:
515             return True
516         else:
517             max_tries = self.reconnect.get_max_tries()
518             return max_tries is None or max_tries > 0
519     
520     def is_connected(self):
521         return self.rpc is not None
522
523     def get_seqno(self):
524         return self.seqno
525
526     def force_reconnect(self):
527         self.reconnect.force_reconnect(ovs.timeval.msec())