jsonrpc: Treat draining data from send queue as activity.
authorBen Pfaff <blp@nicira.com>
Fri, 7 Sep 2012 17:50:15 +0000 (10:50 -0700)
committerBen Pfaff <blp@nicira.com>
Fri, 7 Sep 2012 17:50:15 +0000 (10:50 -0700)
Until now, the jsonrpc module has used messages received from the
remote peer as the sole means to determine that the JSON-RPC
connection is up.  This could in theory interact badly with a
remote peer that stops reading and processing messages from the
receive queue when there is a backlog in the send queue for a
given connection (ovsdb-server is an example of a program that
behaves this way).  This commit fixes the problem by expanding
the definition of "activity" to include successfully sending
JSON-RPC data that was previously queued.

The above change is exactly analogous to the similar change
made to the rconn library in commit 133f2dc95454 (rconn: Treat
draining a message from the send queue as activity.).

Bug #12789.
Signed-off-by: Ben Pfaff <blp@nicira.com>
lib/jsonrpc.c
python/ovs/jsonrpc.py

index c4d7dd2..27b46c6 100644 (file)
@@ -880,9 +880,23 @@ jsonrpc_session_run(struct jsonrpc_session *s)
     }
 
     if (s->rpc) {
+        size_t backlog;
         int error;
 
+        backlog = jsonrpc_get_backlog(s->rpc);
         jsonrpc_run(s->rpc);
+        if (jsonrpc_get_backlog(s->rpc) < backlog) {
+            /* Data previously caught in a queue was successfully sent (or
+             * there's an error, which we'll catch below.)
+             *
+             * We don't count data that is successfully sent immediately as
+             * activity, because there's a lot of queuing downstream from us,
+             * which means that we can push a lot of data into a connection
+             * that has stalled and won't ever recover.
+             */
+            reconnect_activity(s->reconnect, time_msec());
+        }
+
         error = jsonrpc_get_status(s->rpc);
         if (error) {
             reconnect_disconnected(s->reconnect, time_msec(), error);
index cb471cb..0eda32d 100644 (file)
@@ -502,9 +502,19 @@ class Session(object):
 
     def recv(self):
         if self.rpc is not None:
+            backlog = self.rpc.get_backlog()
             error, msg = self.rpc.recv()
-            if not error:
+            if self.rpc.get_backlog() < backlog:
+                # Data previously caught in a queue was successfully sent (or
+                # there's an error, which we'll catch below).
+                #
+                # We don't count data that is successfully sent immediately as
+                # activity, because there's a lot of queuing downstream from
+                # us, which means that we can push a lot of data into a
+                # connection that has stalled and won't ever recover.
                 self.reconnect.activity(ovs.timeval.msec())
+
+            if not error:
                 if msg.type == Message.T_REQUEST and msg.method == "echo":
                     # Echo request.  Send reply.
                     self.send(Message.create_reply(msg.params, msg.id))