X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=python%2Fovs%2Fjsonrpc.py;h=c1540eb78e9c59d3af53ee1856e417ce4145d1c5;hb=8a8cd0acd09763f5edca6506bb286447c5776778;hp=cf08131327b77bd316e6cb462f90f8beb6581ce7;hpb=e0edde6fee279cdbbf3c179f5f50adaf0c7c7f1e;p=sliver-openvswitch.git diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py index cf0813132..c1540eb78 100644 --- a/python/ovs/jsonrpc.py +++ b/python/ovs/jsonrpc.py @@ -186,6 +186,7 @@ class Connection(object): self.input = "" self.output = "" self.parser = None + self.received_bytes = 0 def close(self): self.stream.close() @@ -210,7 +211,7 @@ class Connection(object): if not self.status: self.stream.run_wait(poller) if len(self.output): - self.stream.send_wait() + self.stream.send_wait(poller) def get_status(self): return self.status @@ -221,6 +222,9 @@ class Connection(object): else: return len(self.output) + def get_received_bytes(self): + return self.received_bytes + def __log_msg(self, title, msg): vlog.dbg("%s: %s %s" % (self.name, title, msg)) @@ -271,6 +275,7 @@ class Connection(object): return EOF, None else: self.input += data + self.received_bytes += len(data) else: if self.parser is None: self.parser = ovs.json.Parser() @@ -444,7 +449,18 @@ class Session(object): self.pstream = None if self.rpc: + backlog = self.rpc.get_backlog() self.rpc.run() + if self.rpc.get_backlog() < backlog: + # Data previously caught in a queue was successfully sent (or + # there's an error, which we'll catch below). + # + # We don't count data that is successfully sent immediately as + # activity, because there's a lot of queuing downstream from + # us, which means that we can push a lot of data into a + # connection that has stalled and won't ever recover. + self.reconnect.activity(ovs.timeval.msec()) + error = self.rpc.get_status() if error != 0: self.reconnect.disconnected(ovs.timeval.msec(), error) @@ -502,9 +518,17 @@ class Session(object): def recv(self): if self.rpc is not None: + received_bytes = self.rpc.get_received_bytes() error, msg = self.rpc.recv() + if received_bytes != self.rpc.get_received_bytes(): + # Data was successfully received. + # + # Previously we only counted receiving a full message as + # activity, but with large messages or a slow connection that + # policy could time out the session mid-message. + self.reconnect.activity(ovs.timeval.msec()) + if not error: - self.reconnect.received(ovs.timeval.msec()) if msg.type == Message.T_REQUEST and msg.method == "echo": # Echo request. Send reply. self.send(Message.create_reply(msg.params, msg.id))