ovs.jsonrpc: Fix static method Session.open() reference to 'self'.
[sliver-openvswitch.git] / python / ovs / jsonrpc.py
1 # Copyright (c) 2010, 2011 Nicira Networks
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import errno
16 import logging
17 import os
18
19 import ovs.json
20 import ovs.poller
21 import ovs.reconnect
22 import ovs.stream
23 import ovs.timeval
24
25 EOF = -1
26
27 class Message(object):
28     T_REQUEST = 0               # Request.
29     T_NOTIFY = 1                # Notification.
30     T_REPLY = 2                 # Successful reply.
31     T_ERROR = 3                 # Error reply.
32
33     __types = {T_REQUEST: "request",
34                T_NOTIFY: "notification",
35                T_REPLY: "reply",
36                T_ERROR: "error"}
37
38     def __init__(self, type_, method, params, result, error, id):
39         self.type = type_
40         self.method = method
41         self.params = params
42         self.result = result
43         self.error = error
44         self.id = id
45
46     _next_id = 0
47     @staticmethod
48     def _create_id():
49         this_id = Message._next_id
50         Message._next_id += 1
51         return this_id
52
53     @staticmethod
54     def create_request(method, params):
55         return Message(Message.T_REQUEST, method, params, None, None,
56                        Message._create_id())
57
58     @staticmethod
59     def create_notify(method, params):
60         return Message(Message.T_NOTIFY, method, params, None, None,
61                        None)
62
63     @staticmethod
64     def create_reply(result, id):
65         return Message(Message.T_REPLY, None, None, result, None, id)
66
67     @staticmethod
68     def create_error(error, id):
69         return Message(Message.T_ERROR, None, None, None, error, id)
70
71     @staticmethod
72     def type_to_string(type_):
73         return Message.__types[type_]
74
75     def __validate_arg(self, value, name, must_have):
76         if (value is not None) == (must_have != 0):
77             return None
78         else:
79             type_name = Message.type_to_string(self.type)
80             if must_have:
81                 verb = "must"
82             else:
83                 verb = "must not"
84             return "%s %s have \"%s\"" % (type_name, verb, name)
85
86     def is_valid(self):
87         if self.params is not None and type(self.params) != list:
88             return "\"params\" must be JSON array"
89
90         pattern = {Message.T_REQUEST: 0x11001,
91                    Message.T_NOTIFY:  0x11000,
92                    Message.T_REPLY:   0x00101,
93                    Message.T_ERROR:   0x00011}.get(self.type)
94         if pattern is None:
95             return "invalid JSON-RPC message type %s" % self.type
96
97         return (
98             self.__validate_arg(self.method, "method", pattern & 0x10000) or
99             self.__validate_arg(self.params, "params", pattern & 0x1000) or
100             self.__validate_arg(self.result, "result", pattern & 0x100) or
101             self.__validate_arg(self.error, "error", pattern & 0x10) or
102             self.__validate_arg(self.id, "id", pattern & 0x1))
103
104     @staticmethod
105     def from_json(json):
106         if type(json) != dict:
107             return "message is not a JSON object"
108
109         # Make a copy to avoid modifying the caller's dict.
110         json = dict(json)
111
112         if "method" in json:
113             method = json.pop("method")
114             if type(method) not in [str, unicode]:
115                 return "method is not a JSON string"
116         else:
117             method = None
118
119         params = json.pop("params", None)
120         result = json.pop("result", None)
121         error = json.pop("error", None)
122         id = json.pop("id", None)
123         if len(json):
124             return "message has unexpected member \"%s\"" % json.popitem()[0]
125
126         if result is not None:
127             msg_type = Message.T_REPLY
128         elif error is not None:
129             msg_type = Message.T_ERROR
130         elif id is not None:
131             msg_type = Message.T_REQUEST
132         else:
133             msg_type = Message.T_NOTIFY
134         
135         msg = Message(msg_type, method, params, result, error, id)
136         validation_error = msg.is_valid()
137         if validation_error is not None:
138             return validation_error
139         else:
140             return msg
141
142     def to_json(self):
143         json = {}
144
145         if self.method is not None:
146             json["method"] = self.method
147
148         if self.params is not None:
149             json["params"] = self.params
150
151         if self.result is not None or self.type == Message.T_ERROR:
152             json["result"] = self.result
153
154         if self.error is not None or self.type == Message.T_REPLY:
155             json["error"] = self.error
156
157         if self.id is not None or self.type == Message.T_NOTIFY:
158             json["id"] = self.id
159
160         return json
161
162     def __str__(self):
163         s = [Message.type_to_string(self.type)]
164         if self.method is not None:
165             s.append("method=\"%s\"" % self.method)
166         if self.params is not None:
167             s.append("params=" + ovs.json.to_string(self.params))
168         if self.error is not None:
169             s.append("error=" + ovs.json.to_string(self.error))
170         if self.id is not None:
171             s.append("id=" + ovs.json.to_string(self.id))
172         return ", ".join(s)
173
174 class Connection(object):
175     def __init__(self, stream):
176         self.name = stream.name
177         self.stream = stream
178         self.status = 0
179         self.input = ""
180         self.output = ""
181         self.parser = None
182
183     def close(self):
184         self.stream.close()
185         self.stream = None
186
187     def run(self):
188         if self.status:
189             return
190
191         while len(self.output):
192             retval = self.stream.send(self.output)
193             if retval >= 0:
194                 self.output = self.output[retval:]
195             else:
196                 if retval != -errno.EAGAIN:
197                     logging.warn("%s: send error: %s" % (self.name,
198                                                          os.strerror(-retval)))
199                     self.error(-retval)
200                 break
201
202     def wait(self, poller):
203         if not self.status:
204             self.stream.run_wait(poller)
205             if len(self.output):
206                 self.stream.send_wait()
207
208     def get_status(self):
209         return self.status
210
211     def get_backlog(self):
212         if self.status != 0:
213             return 0
214         else:
215             return len(self.output)
216
217     def __log_msg(self, title, msg):
218         logging.debug("%s: %s %s" % (self.name, title, msg))
219
220     def send(self, msg):
221         if self.status:
222             return self.status
223
224         self.__log_msg("send", msg)
225
226         was_empty = len(self.output) == 0
227         self.output += ovs.json.to_string(msg.to_json())
228         if was_empty:
229             self.run()
230         return self.status
231
232     def send_block(self, msg):
233         error = self.send(msg)
234         if error:
235             return error
236
237         while True:
238             self.run()
239             if not self.get_backlog() or self.get_status():
240                 return self.status
241
242             poller = ovs.poller.Poller()
243             self.wait(poller)
244             poller.block()
245
246     def recv(self):
247         if self.status:
248             return self.status, None
249
250         while True:
251             if not self.input:
252                 error, data = self.stream.recv(4096)
253                 if error:
254                     if error == errno.EAGAIN:
255                         return error, None
256                     else:
257                         # XXX rate-limit
258                         logging.warning("%s: receive error: %s"
259                                         % (self.name, os.strerror(error)))
260                         self.error(error)
261                         return self.status, None
262                 elif not data:
263                     self.error(EOF)
264                     return EOF, None
265                 else:
266                     self.input += data
267             else:
268                 if self.parser is None:
269                     self.parser = ovs.json.Parser()
270                 self.input = self.input[self.parser.feed(self.input):]
271                 if self.parser.is_done():
272                     msg = self.__process_msg()
273                     if msg:
274                         return 0, msg
275                     else:
276                         return self.status, None
277
278     def recv_block(self):
279         while True:
280             error, msg = self.recv()
281             if error != errno.EAGAIN:
282                 return error, msg
283
284             self.run()
285
286             poller = ovs.poller.Poller()
287             self.wait(poller)
288             self.recv_wait(poller)
289             poller.block()
290     
291     def transact_block(self, request):
292         id = request.id
293
294         error = self.send(request)
295         reply = None
296         while not error:
297             error, reply = self.recv_block()
298             if reply and reply.type == Message.T_REPLY and reply.id == id:
299                 break
300         return error, reply
301
302     def __process_msg(self):
303         json = self.parser.finish()
304         self.parser = None
305         if type(json) in [str, unicode]:
306             # XXX rate-limit
307             logging.warning("%s: error parsing stream: %s" % (self.name, json))
308             self.error(errno.EPROTO)
309             return
310
311         msg = Message.from_json(json)
312         if not isinstance(msg, Message):
313             # XXX rate-limit
314             logging.warning("%s: received bad JSON-RPC message: %s"
315                             % (self.name, msg))
316             self.error(errno.EPROTO)
317             return
318
319         self.__log_msg("received", msg)
320         return msg
321         
322     def recv_wait(self, poller):
323         if self.status or self.input:
324             poller.immediate_wake()
325         else:
326             self.stream.recv_wait(poller)
327
328     def error(self, error):
329         if self.status == 0:
330             self.status = error
331             self.stream.close()
332             self.output = ""
333             
334 class Session(object):
335     """A JSON-RPC session with reconnection."""
336
337     def __init__(self, reconnect, rpc):
338         self.reconnect = reconnect
339         self.rpc = rpc
340         self.stream = None
341         self.pstream = None
342         self.seqno = 0
343
344     @staticmethod
345     def open(name):
346         """Creates and returns a Session that maintains a JSON-RPC session to
347         'name', which should be a string acceptable to ovs.stream.Stream or
348         ovs.stream.PassiveStream's initializer.
349         
350         If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
351         session connects and reconnects, with back-off, to 'name'.
352         
353         If 'name' is a passive connection method, e.g. "ptcp:", the new session
354         listens for connections to 'name'.  It maintains at most one connection
355         at any given time.  Any new connection causes the previous one (if any)
356         to be dropped."""
357         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
358         reconnect.set_name(name)
359         reconnect.enable(ovs.timeval.msec())
360
361         if ovs.stream.PassiveStream.is_valid_name(name):
362             reconnect.set_passive(True, ovs.timeval.msec())
363
364         return Session(reconnect, None)
365
366     @staticmethod
367     def open_unreliably(jsonrpc):
368         reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
369         reconnect.set_quiet(True)
370         reconnect.set_name(jsonrpc.name)
371         reconnect.set_max_tries(0)
372         reconnect.connected(ovs.timeval.msec())
373         return Session(reconnect, jsonrpc)
374
375     def close(self):
376         if self.rpc is not None:
377             self.rpc.close()
378             self.rpc = None
379         if self.stream is not None:
380             self.stream.close()
381             self.stream = None
382         if self.pstream is not None:
383             self.pstream.close()
384             self.pstream = None
385
386     def __disconnect(self):
387         if self.rpc is not None:
388             self.rpc.error(EOF)
389             self.rpc.close()
390             self.rpc = None
391             self.seqno += 1
392         elif self.stream is not None:
393             self.stream.close()
394             self.stream = None
395             self.seqno += 1
396     
397     def __connect(self):
398         self.__disconnect()
399
400         name = self.reconnect.get_name()
401         if not self.reconnect.is_passive():
402             error, self.stream = ovs.stream.Stream.open(name)
403             if not error:
404                 self.reconnect.connecting(ovs.timeval.msec())
405             else:
406                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
407         elif self.pstream is not None:
408             error, self.pstream = ovs.stream.PassiveStream.open(name)
409             if not error:
410                 self.reconnect.listening(ovs.timeval.msec())
411             else:
412                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
413
414         self.seqno += 1
415
416     def run(self):
417         if self.pstream is not None:
418             error, stream = self.pstream.accept()
419             if error == 0:
420                 if self.rpc or self.stream:
421                     # XXX rate-limit
422                     logging.info("%s: new connection replacing active "
423                                  "connection" % self.reconnect.get_name())
424                     self.__disconnect()
425                 self.reconnect.connected(ovs.timeval.msec())
426                 self.rpc = Connection(stream)
427             elif error != errno.EAGAIN:
428                 self.reconnect.listen_error(ovs.timeval.msec(), error)
429                 self.pstream.close()
430                 self.pstream = None
431
432         if self.rpc:
433             self.rpc.run()
434             error = self.rpc.get_status()
435             if error != 0:
436                 self.reconnect.disconnected(ovs.timeval.msec(), error)
437                 self.__disconnect()
438         elif self.stream is not None:
439             self.stream.run()
440             error = self.stream.connect()
441             if error == 0:
442                 self.reconnect.connected(ovs.timeval.msec())
443                 self.rpc = Connection(self.stream)
444                 self.stream = None
445             elif error != errno.EAGAIN:
446                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
447                 self.stream.close()
448                 self.stream = None
449
450         action = self.reconnect.run(ovs.timeval.msec())
451         if action == ovs.reconnect.CONNECT:
452             self.__connect()
453         elif action == ovs.reconnect.DISCONNECT:
454             self.reconnect.disconnected(ovs.timeval.msec(), 0)
455             self.__disconnect()
456         elif action == ovs.reconnect.PROBE:
457             if self.rpc:
458                 request = Message.create_request("echo", [])
459                 request.id = "echo"
460                 self.rpc.send(request)
461         else:
462             assert action == None
463
464     def wait(self, poller):
465         if self.rpc is not None:
466             self.rpc.wait(poller)
467         elif self.stream is not None:
468             self.stream.run_wait(poller)
469             self.stream.connect_wait(poller)
470         if self.pstream is not None:
471             self.pstream.wait(poller)
472         self.reconnect.wait(poller, ovs.timeval.msec())
473
474     def get_backlog(self):
475         if self.rpc is not None:
476             return self.rpc.get_backlog()
477         else:
478             return 0
479
480     def get_name(self):
481         return self.reconnect.get_name()
482
483     def send(self, msg):
484         if self.rpc is not None:
485             return self.rpc.send(msg)
486         else:
487             return errno.ENOTCONN
488
489     def recv(self):
490         if self.rpc is not None:
491             error, msg = self.rpc.recv()
492             if not error:
493                 self.reconnect.received(ovs.timeval.msec())
494                 if msg.type == Message.T_REQUEST and msg.method == "echo":
495                     # Echo request.  Send reply.
496                     self.send(Message.create_reply(msg.params, msg.id))
497                 elif msg.type == Message.T_REPLY and msg.id == "echo":
498                     # It's a reply to our echo request.  Suppress it.
499                     pass
500                 else:
501                     return msg
502         return None
503
504     def recv_wait(self, poller):
505         if self.rpc is not None:
506             self.rpc.recv_wait(poller)
507
508     def is_alive(self):
509         if self.rpc is not None or self.stream is not None:
510             return True
511         else:
512             max_tries = self.reconnect.get_max_tries()
513             return max_tries is None or max_tries > 0
514     
515     def is_connected(self):
516         return self.rpc is not None
517
518     def get_seqno(self):
519         return self.seqno
520
521     def force_reconnect(self):
522         self.reconnect.force_reconnect(ovs.timeval.msec())