-# Copyright (c) 2010, 2011 Nicira Networks
+# Copyright (c) 2010, 2011, 2012, 2013 Nicira, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# limitations under the License.
import errno
-import logging
import os
import ovs.json
import ovs.reconnect
import ovs.stream
import ovs.timeval
+import ovs.util
+import ovs.vlog
+
+EOF = ovs.util.EOF
+vlog = ovs.vlog.Vlog("jsonrpc")
-EOF = -1
class Message(object):
T_REQUEST = 0 # Request.
T_NOTIFY: "notification",
T_REPLY: "reply",
T_ERROR: "error"}
- __next_id = 0
def __init__(self, type_, method, params, result, error, id):
self.type = type_
self.id = id
_next_id = 0
+
@staticmethod
def _create_id():
this_id = Message._next_id
def type_to_string(type_):
return Message.__types[type_]
- @staticmethod
- def __validate_arg(value, name, must_have):
+ def __validate_arg(self, value, name, must_have):
if (value is not None) == (must_have != 0):
return None
else:
return "invalid JSON-RPC message type %s" % self.type
return (
- Message.__validate_arg(self.method, "method", pattern & 0x10000) or
- Message.__validate_arg(self.params, "params", pattern & 0x1000) or
- Message.__validate_arg(self.result, "result", pattern & 0x100) or
- Message.__validate_arg(self.error, "error", pattern & 0x10) or
- Message.__validate_arg(self.id, "id", pattern & 0x1))
+ self.__validate_arg(self.method, "method", pattern & 0x10000) or
+ self.__validate_arg(self.params, "params", pattern & 0x1000) or
+ self.__validate_arg(self.result, "result", pattern & 0x100) or
+ self.__validate_arg(self.error, "error", pattern & 0x10) or
+ self.__validate_arg(self.id, "id", pattern & 0x1))
@staticmethod
def from_json(json):
params = json.pop("params", None)
result = json.pop("result", None)
error = json.pop("error", None)
- id = json.pop("id", None)
+ id_ = json.pop("id", None)
if len(json):
return "message has unexpected member \"%s\"" % json.popitem()[0]
msg_type = Message.T_REPLY
elif error is not None:
msg_type = Message.T_ERROR
- elif id is not None:
+ elif id_ is not None:
msg_type = Message.T_REQUEST
else:
msg_type = Message.T_NOTIFY
-
- msg = Message(msg_type, method, params, result, error, id)
+
+ msg = Message(msg_type, method, params, result, error, id_)
validation_error = msg.is_valid()
if validation_error is not None:
return validation_error
s.append("method=\"%s\"" % self.method)
if self.params is not None:
s.append("params=" + ovs.json.to_string(self.params))
+ if self.result is not None:
+ s.append("result=" + ovs.json.to_string(self.result))
if self.error is not None:
s.append("error=" + ovs.json.to_string(self.error))
if self.id is not None:
s.append("id=" + ovs.json.to_string(self.id))
return ", ".join(s)
+
class Connection(object):
def __init__(self, stream):
- self.name = stream.get_name()
+ self.name = stream.name
self.stream = stream
self.status = 0
self.input = ""
self.output = ""
self.parser = None
+ self.received_bytes = 0
def close(self):
self.stream.close()
self.output = self.output[retval:]
else:
if retval != -errno.EAGAIN:
- logging.warn("%s: send error: %s" % (self.name,
- os.strerror(-retval)))
+ vlog.warn("%s: send error: %s" %
+ (self.name, os.strerror(-retval)))
self.error(-retval)
break
if not self.status:
self.stream.run_wait(poller)
if len(self.output):
- self.stream.send_wait()
+ self.stream.send_wait(poller)
def get_status(self):
return self.status
else:
return len(self.output)
- def get_name(self):
- return self.name
+ def get_received_bytes(self):
+ return self.received_bytes
def __log_msg(self, title, msg):
- logging.debug("%s: %s %s" % (self.name, title, msg))
+ if vlog.dbg_is_enabled():
+ vlog.dbg("%s: %s %s" % (self.name, title, msg))
def send(self, msg):
if self.status:
return self.status, None
while True:
- if len(self.input) == 0:
+ if not self.input:
error, data = self.stream.recv(4096)
if error:
if error == errno.EAGAIN:
return error, None
else:
# XXX rate-limit
- logging.warning("%s: receive error: %s"
- % (self.name, os.strerror(error)))
+ vlog.warn("%s: receive error: %s"
+ % (self.name, os.strerror(error)))
self.error(error)
return self.status, None
- elif len(data) == 0:
+ elif not data:
self.error(EOF)
return EOF, None
else:
self.input += data
+ self.received_bytes += len(data)
else:
if self.parser is None:
self.parser = ovs.json.Parser()
self.wait(poller)
self.recv_wait(poller)
poller.block()
-
+
def transact_block(self, request):
- id = request.id
+ id_ = request.id
error = self.send(request)
reply = None
while not error:
error, reply = self.recv_block()
- if reply and reply.type == Message.T_REPLY and reply.id == id:
+ if (reply
+ and (reply.type == Message.T_REPLY
+ or reply.type == Message.T_ERROR)
+ and reply.id == id_):
break
return error, reply
self.parser = None
if type(json) in [str, unicode]:
# XXX rate-limit
- logging.warning("%s: error parsing stream: %s" % (self.name, json))
+ vlog.warn("%s: error parsing stream: %s" % (self.name, json))
self.error(errno.EPROTO)
return
msg = Message.from_json(json)
if not isinstance(msg, Message):
# XXX rate-limit
- logging.warning("%s: received bad JSON-RPC message: %s"
- % (self.name, msg))
+ vlog.warn("%s: received bad JSON-RPC message: %s"
+ % (self.name, msg))
self.error(errno.EPROTO)
return
self.__log_msg("received", msg)
return msg
-
+
def recv_wait(self, poller):
- if self.status or len(self.input) > 0:
+ if self.status or self.input:
poller.immediate_wake()
else:
self.stream.recv_wait(poller)
self.status = error
self.stream.close()
self.output = ""
-
+
+
class Session(object):
"""A JSON-RPC session with reconnection."""
"""Creates and returns a Session that maintains a JSON-RPC session to
'name', which should be a string acceptable to ovs.stream.Stream or
ovs.stream.PassiveStream's initializer.
-
+
If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
session connects and reconnects, with back-off, to 'name'.
-
+
If 'name' is a passive connection method, e.g. "ptcp:", the new session
listens for connections to 'name'. It maintains at most one connection
at any given time. Any new connection causes the previous one (if any)
reconnect.enable(ovs.timeval.msec())
if ovs.stream.PassiveStream.is_valid_name(name):
- self.reconnect.set_passive(True, ovs.timeval.msec())
+ reconnect.set_passive(True, ovs.timeval.msec())
+
+ if ovs.stream.stream_or_pstream_needs_probes(name):
+ reconnect.set_probe_interval(0)
return Session(reconnect, None)
def open_unreliably(jsonrpc):
reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec())
reconnect.set_quiet(True)
- reconnect.set_name(jsonrpc.get_name())
+ reconnect.set_name(jsonrpc.name)
reconnect.set_max_tries(0)
reconnect.connected(ovs.timeval.msec())
return Session(reconnect, jsonrpc)
self.stream.close()
self.stream = None
self.seqno += 1
-
+
def __connect(self):
self.__disconnect()
if error == 0:
if self.rpc or self.stream:
# XXX rate-limit
- logging.info("%s: new connection replacing active "
- "connection" % self.reconnect.get_name())
+ vlog.info("%s: new connection replacing active "
+ "connection" % self.reconnect.get_name())
self.__disconnect()
self.reconnect.connected(ovs.timeval.msec())
self.rpc = Connection(stream)
self.pstream = None
if self.rpc:
+ backlog = self.rpc.get_backlog()
self.rpc.run()
+ if self.rpc.get_backlog() < backlog:
+ # Data previously caught in a queue was successfully sent (or
+ # there's an error, which we'll catch below).
+ #
+ # We don't count data that is successfully sent immediately as
+ # activity, because there's a lot of queuing downstream from
+ # us, which means that we can push a lot of data into a
+ # connection that has stalled and won't ever recover.
+ self.reconnect.activity(ovs.timeval.msec())
+
error = self.rpc.get_status()
if error != 0:
self.reconnect.disconnected(ovs.timeval.msec(), error)
def recv(self):
if self.rpc is not None:
+ received_bytes = self.rpc.get_received_bytes()
error, msg = self.rpc.recv()
+ if received_bytes != self.rpc.get_received_bytes():
+ # Data was successfully received.
+ #
+ # Previously we only counted receiving a full message as
+ # activity, but with large messages or a slow connection that
+ # policy could time out the session mid-message.
+ self.reconnect.activity(ovs.timeval.msec())
+
if not error:
- self.reconnect.received(ovs.timeval.msec())
if msg.type == Message.T_REQUEST and msg.method == "echo":
# Echo request. Send reply.
self.send(Message.create_reply(msg.params, msg.id))
else:
max_tries = self.reconnect.get_max_tries()
return max_tries is None or max_tries > 0
-
+
def is_connected(self):
return self.rpc is not None