Setting tag sliver-openvswitch-2.2.90-1
[sliver-openvswitch.git] / python / ovs / jsonrpc.py
index 1c3f099..6e7a2cc 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2010, 2011 Nicira Networks
+# Copyright (c) 2010, 2011, 2012, 2013 Nicira, Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -20,9 +20,10 @@ import ovs.poller
 import ovs.reconnect
 import ovs.stream
 import ovs.timeval
+import ovs.util
 import ovs.vlog
 
-EOF = -1
+EOF = ovs.util.EOF
 vlog = ovs.vlog.Vlog("jsonrpc")
 
 
@@ -185,6 +186,7 @@ class Connection(object):
         self.input = ""
         self.output = ""
         self.parser = None
+        self.received_bytes = 0
 
     def close(self):
         self.stream.close()
@@ -209,7 +211,7 @@ class Connection(object):
         if not self.status:
             self.stream.run_wait(poller)
             if len(self.output):
-                self.stream.send_wait()
+                self.stream.send_wait(poller)
 
     def get_status(self):
         return self.status
@@ -220,8 +222,12 @@ class Connection(object):
         else:
             return len(self.output)
 
+    def get_received_bytes(self):
+        return self.received_bytes
+
     def __log_msg(self, title, msg):
-        vlog.dbg("%s: %s %s" % (self.name, title, msg))
+        if vlog.dbg_is_enabled():
+            vlog.dbg("%s: %s %s" % (self.name, title, msg))
 
     def send(self, msg):
         if self.status:
@@ -270,6 +276,7 @@ class Connection(object):
                     return EOF, None
                 else:
                     self.input += data
+                    self.received_bytes += len(data)
             else:
                 if self.parser is None:
                     self.parser = ovs.json.Parser()
@@ -371,6 +378,9 @@ class Session(object):
         if ovs.stream.PassiveStream.is_valid_name(name):
             reconnect.set_passive(True, ovs.timeval.msec())
 
+        if ovs.stream.stream_or_pstream_needs_probes(name):
+            reconnect.set_probe_interval(0)
+
         return Session(reconnect, None)
 
     @staticmethod
@@ -440,7 +450,18 @@ class Session(object):
                 self.pstream = None
 
         if self.rpc:
+            backlog = self.rpc.get_backlog()
             self.rpc.run()
+            if self.rpc.get_backlog() < backlog:
+                # Data previously caught in a queue was successfully sent (or
+                # there's an error, which we'll catch below).
+                #
+                # We don't count data that is successfully sent immediately as
+                # activity, because there's a lot of queuing downstream from
+                # us, which means that we can push a lot of data into a
+                # connection that has stalled and won't ever recover.
+                self.reconnect.activity(ovs.timeval.msec())
+
             error = self.rpc.get_status()
             if error != 0:
                 self.reconnect.disconnected(ovs.timeval.msec(), error)
@@ -498,9 +519,17 @@ class Session(object):
 
     def recv(self):
         if self.rpc is not None:
+            received_bytes = self.rpc.get_received_bytes()
             error, msg = self.rpc.recv()
+            if received_bytes != self.rpc.get_received_bytes():
+                # Data was successfully received.
+                #
+                # Previously we only counted receiving a full message as
+                # activity, but with large messages or a slow connection that
+                # policy could time out the session mid-message.
+                self.reconnect.activity(ovs.timeval.msec())
+
             if not error:
-                self.reconnect.received(ovs.timeval.msec())
                 if msg.type == Message.T_REQUEST and msg.method == "echo":
                     # Echo request.  Send reply.
                     self.send(Message.create_reply(msg.params, msg.id))