1 # Copyright (c) 2010, 2011, 2012, 2013 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
27 vlog = ovs.vlog.Vlog("jsonrpc")
30 class Message(object):
31 T_REQUEST = 0 # Request.
32 T_NOTIFY = 1 # Notification.
33 T_REPLY = 2 # Successful reply.
34 T_ERROR = 3 # Error reply.
36 __types = {T_REQUEST: "request",
37 T_NOTIFY: "notification",
41 def __init__(self, type_, method, params, result, error, id):
53 this_id = Message._next_id
58 def create_request(method, params):
59 return Message(Message.T_REQUEST, method, params, None, None,
63 def create_notify(method, params):
64 return Message(Message.T_NOTIFY, method, params, None, None,
68 def create_reply(result, id):
69 return Message(Message.T_REPLY, None, None, result, None, id)
72 def create_error(error, id):
73 return Message(Message.T_ERROR, None, None, None, error, id)
76 def type_to_string(type_):
77 return Message.__types[type_]
79 def __validate_arg(self, value, name, must_have):
80 if (value is not None) == (must_have != 0):
83 type_name = Message.type_to_string(self.type)
88 return "%s %s have \"%s\"" % (type_name, verb, name)
91 if self.params is not None and type(self.params) != list:
92 return "\"params\" must be JSON array"
94 pattern = {Message.T_REQUEST: 0x11001,
95 Message.T_NOTIFY: 0x11000,
96 Message.T_REPLY: 0x00101,
97 Message.T_ERROR: 0x00011}.get(self.type)
99 return "invalid JSON-RPC message type %s" % self.type
102 self.__validate_arg(self.method, "method", pattern & 0x10000) or
103 self.__validate_arg(self.params, "params", pattern & 0x1000) or
104 self.__validate_arg(self.result, "result", pattern & 0x100) or
105 self.__validate_arg(self.error, "error", pattern & 0x10) or
106 self.__validate_arg(self.id, "id", pattern & 0x1))
110 if type(json) != dict:
111 return "message is not a JSON object"
113 # Make a copy to avoid modifying the caller's dict.
117 method = json.pop("method")
118 if type(method) not in [str, unicode]:
119 return "method is not a JSON string"
123 params = json.pop("params", None)
124 result = json.pop("result", None)
125 error = json.pop("error", None)
126 id_ = json.pop("id", None)
128 return "message has unexpected member \"%s\"" % json.popitem()[0]
130 if result is not None:
131 msg_type = Message.T_REPLY
132 elif error is not None:
133 msg_type = Message.T_ERROR
134 elif id_ is not None:
135 msg_type = Message.T_REQUEST
137 msg_type = Message.T_NOTIFY
139 msg = Message(msg_type, method, params, result, error, id_)
140 validation_error = msg.is_valid()
141 if validation_error is not None:
142 return validation_error
149 if self.method is not None:
150 json["method"] = self.method
152 if self.params is not None:
153 json["params"] = self.params
155 if self.result is not None or self.type == Message.T_ERROR:
156 json["result"] = self.result
158 if self.error is not None or self.type == Message.T_REPLY:
159 json["error"] = self.error
161 if self.id is not None or self.type == Message.T_NOTIFY:
167 s = [Message.type_to_string(self.type)]
168 if self.method is not None:
169 s.append("method=\"%s\"" % self.method)
170 if self.params is not None:
171 s.append("params=" + ovs.json.to_string(self.params))
172 if self.result is not None:
173 s.append("result=" + ovs.json.to_string(self.result))
174 if self.error is not None:
175 s.append("error=" + ovs.json.to_string(self.error))
176 if self.id is not None:
177 s.append("id=" + ovs.json.to_string(self.id))
181 class Connection(object):
182 def __init__(self, stream):
183 self.name = stream.name
189 self.received_bytes = 0
199 while len(self.output):
200 retval = self.stream.send(self.output)
202 self.output = self.output[retval:]
204 if retval != -errno.EAGAIN:
205 vlog.warn("%s: send error: %s" %
206 (self.name, os.strerror(-retval)))
210 def wait(self, poller):
212 self.stream.run_wait(poller)
214 self.stream.send_wait(poller)
216 def get_status(self):
219 def get_backlog(self):
223 return len(self.output)
225 def get_received_bytes(self):
226 return self.received_bytes
228 def __log_msg(self, title, msg):
229 if vlog.dbg_is_enabled():
230 vlog.dbg("%s: %s %s" % (self.name, title, msg))
236 self.__log_msg("send", msg)
238 was_empty = len(self.output) == 0
239 self.output += ovs.json.to_string(msg.to_json())
244 def send_block(self, msg):
245 error = self.send(msg)
251 if not self.get_backlog() or self.get_status():
254 poller = ovs.poller.Poller()
260 return self.status, None
264 error, data = self.stream.recv(4096)
266 if error == errno.EAGAIN:
270 vlog.warn("%s: receive error: %s"
271 % (self.name, os.strerror(error)))
273 return self.status, None
279 self.received_bytes += len(data)
281 if self.parser is None:
282 self.parser = ovs.json.Parser()
283 self.input = self.input[self.parser.feed(self.input):]
284 if self.parser.is_done():
285 msg = self.__process_msg()
289 return self.status, None
291 def recv_block(self):
293 error, msg = self.recv()
294 if error != errno.EAGAIN:
299 poller = ovs.poller.Poller()
301 self.recv_wait(poller)
304 def transact_block(self, request):
307 error = self.send(request)
310 error, reply = self.recv_block()
312 and (reply.type == Message.T_REPLY
313 or reply.type == Message.T_ERROR)
314 and reply.id == id_):
318 def __process_msg(self):
319 json = self.parser.finish()
321 if type(json) in [str, unicode]:
323 vlog.warn("%s: error parsing stream: %s" % (self.name, json))
324 self.error(errno.EPROTO)
327 msg = Message.from_json(json)
328 if not isinstance(msg, Message):
330 vlog.warn("%s: received bad JSON-RPC message: %s"
332 self.error(errno.EPROTO)
335 self.__log_msg("received", msg)
338 def recv_wait(self, poller):
339 if self.status or self.input:
340 poller.immediate_wake()
342 self.stream.recv_wait(poller)
344 def error(self, error):
351 class Session(object):
352 """A JSON-RPC session with reconnection."""
354 def __init__(self, reconnect, rpc):
355 self.reconnect = reconnect
363 """Creates and returns a Session that maintains a JSON-RPC session to
364 'name', which should be a string acceptable to ovs.stream.Stream or
365 ovs.stream.PassiveStream's initializer.
367 If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
368 session connects and reconnects, with back-off, to 'name'.
370 If 'name' is a passive connection method, e.g. "ptcp:", the new session
371 listens for connections to 'name'. It maintains at most one connection
372 at any given time. Any new connection causes the previous one (if any)
374 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
375 reconnect.set_name(name)
376 reconnect.enable(ovs.timeval.msec())
378 if ovs.stream.PassiveStream.is_valid_name(name):
379 reconnect.set_passive(True, ovs.timeval.msec())
381 if ovs.stream.stream_or_pstream_needs_probes(name):
382 reconnect.set_probe_interval(0)
384 return Session(reconnect, None)
387 def open_unreliably(jsonrpc):
388 reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
389 reconnect.set_quiet(True)
390 reconnect.set_name(jsonrpc.name)
391 reconnect.set_max_tries(0)
392 reconnect.connected(ovs.timeval.msec())
393 return Session(reconnect, jsonrpc)
396 if self.rpc is not None:
399 if self.stream is not None:
402 if self.pstream is not None:
406 def __disconnect(self):
407 if self.rpc is not None:
412 elif self.stream is not None:
420 name = self.reconnect.get_name()
421 if not self.reconnect.is_passive():
422 error, self.stream = ovs.stream.Stream.open(name)
424 self.reconnect.connecting(ovs.timeval.msec())
426 self.reconnect.connect_failed(ovs.timeval.msec(), error)
427 elif self.pstream is not None:
428 error, self.pstream = ovs.stream.PassiveStream.open(name)
430 self.reconnect.listening(ovs.timeval.msec())
432 self.reconnect.connect_failed(ovs.timeval.msec(), error)
437 if self.pstream is not None:
438 error, stream = self.pstream.accept()
440 if self.rpc or self.stream:
442 vlog.info("%s: new connection replacing active "
443 "connection" % self.reconnect.get_name())
445 self.reconnect.connected(ovs.timeval.msec())
446 self.rpc = Connection(stream)
447 elif error != errno.EAGAIN:
448 self.reconnect.listen_error(ovs.timeval.msec(), error)
453 backlog = self.rpc.get_backlog()
455 if self.rpc.get_backlog() < backlog:
456 # Data previously caught in a queue was successfully sent (or
457 # there's an error, which we'll catch below).
459 # We don't count data that is successfully sent immediately as
460 # activity, because there's a lot of queuing downstream from
461 # us, which means that we can push a lot of data into a
462 # connection that has stalled and won't ever recover.
463 self.reconnect.activity(ovs.timeval.msec())
465 error = self.rpc.get_status()
467 self.reconnect.disconnected(ovs.timeval.msec(), error)
469 elif self.stream is not None:
471 error = self.stream.connect()
473 self.reconnect.connected(ovs.timeval.msec())
474 self.rpc = Connection(self.stream)
476 elif error != errno.EAGAIN:
477 self.reconnect.connect_failed(ovs.timeval.msec(), error)
481 action = self.reconnect.run(ovs.timeval.msec())
482 if action == ovs.reconnect.CONNECT:
484 elif action == ovs.reconnect.DISCONNECT:
485 self.reconnect.disconnected(ovs.timeval.msec(), 0)
487 elif action == ovs.reconnect.PROBE:
489 request = Message.create_request("echo", [])
491 self.rpc.send(request)
493 assert action == None
495 def wait(self, poller):
496 if self.rpc is not None:
497 self.rpc.wait(poller)
498 elif self.stream is not None:
499 self.stream.run_wait(poller)
500 self.stream.connect_wait(poller)
501 if self.pstream is not None:
502 self.pstream.wait(poller)
503 self.reconnect.wait(poller, ovs.timeval.msec())
505 def get_backlog(self):
506 if self.rpc is not None:
507 return self.rpc.get_backlog()
512 return self.reconnect.get_name()
515 if self.rpc is not None:
516 return self.rpc.send(msg)
518 return errno.ENOTCONN
521 if self.rpc is not None:
522 received_bytes = self.rpc.get_received_bytes()
523 error, msg = self.rpc.recv()
524 if received_bytes != self.rpc.get_received_bytes():
525 # Data was successfully received.
527 # Previously we only counted receiving a full message as
528 # activity, but with large messages or a slow connection that
529 # policy could time out the session mid-message.
530 self.reconnect.activity(ovs.timeval.msec())
533 if msg.type == Message.T_REQUEST and msg.method == "echo":
534 # Echo request. Send reply.
535 self.send(Message.create_reply(msg.params, msg.id))
536 elif msg.type == Message.T_REPLY and msg.id == "echo":
537 # It's a reply to our echo request. Suppress it.
543 def recv_wait(self, poller):
544 if self.rpc is not None:
545 self.rpc.recv_wait(poller)
548 if self.rpc is not None or self.stream is not None:
551 max_tries = self.reconnect.get_max_tries()
552 return max_tries is None or max_tries > 0
554 def is_connected(self):
555 return self.rpc is not None
560 def force_reconnect(self):
561 self.reconnect.force_reconnect(ovs.timeval.msec())