Drop rconn's responsibility for limiting the tx queue.
[sliver-openvswitch.git] / lib / rconn.c
index 9bb2b5c..6bc371a 100644 (file)
@@ -83,7 +83,6 @@ struct rconn {
     bool reliable;
 
     struct queue txq;
-    int txq_limit;
 
     int backoff;
     int max_backoff;
@@ -119,31 +118,28 @@ static void state_transition(struct rconn *, enum state);
 static int try_send(struct rconn *);
 static int reconnect(struct rconn *);
 static void disconnect(struct rconn *, int error);
+static void flush_queue(struct rconn *);
 static void question_connectivity(struct rconn *);
 
 /* Creates a new rconn, connects it (reliably) to 'name', and returns it. */
 struct rconn *
-rconn_new(const char *name, int txq_limit, int inactivity_probe_interval,
-          int max_backoff) 
+rconn_new(const char *name, int inactivity_probe_interval, int max_backoff)
 {
-    struct rconn *rc = rconn_create(txq_limit, inactivity_probe_interval,
-                                    max_backoff);
+    struct rconn *rc = rconn_create(inactivity_probe_interval, max_backoff);
     rconn_connect(rc, name);
     return rc;
 }
 
 /* Creates a new rconn, connects it (unreliably) to 'vconn', and returns it. */
 struct rconn *
-rconn_new_from_vconn(const char *name, int txq_limit, struct vconn *vconn) 
+rconn_new_from_vconn(const char *name, struct vconn *vconn) 
 {
-    struct rconn *rc = rconn_create(txq_limit, 60, 0);
+    struct rconn *rc = rconn_create(60, 0);
     rconn_connect_unreliably(rc, name, vconn);
     return rc;
 }
 
 /* Creates and returns a new rconn.
- *
- * 'txq_limit' is the maximum length of the send queue, in packets.
  *
  * 'probe_interval' is a number of seconds.  If the interval passes once
  * without an OpenFlow message being received from the peer, the rconn sends
@@ -156,31 +152,29 @@ rconn_new_from_vconn(const char *name, int txq_limit, struct vconn *vconn)
  * failure until it reaches 'max_backoff'.  If 0 is specified, the default of
  * 60 seconds is used. */
 struct rconn *
-rconn_create(int txq_limit, int probe_interval, int max_backoff)
+rconn_create(int probe_interval, int max_backoff)
 {
     struct rconn *rc = xcalloc(1, sizeof *rc);
 
     rc->state = S_VOID;
-    rc->state_entered = time(0);
+    rc->state_entered = time_now();
 
     rc->vconn = NULL;
     rc->name = xstrdup("void");
     rc->reliable = false;
 
     queue_init(&rc->txq);
-    assert(txq_limit > 0);
-    rc->txq_limit = txq_limit;
 
     rc->backoff = 0;
     rc->max_backoff = max_backoff ? max_backoff : 60;
     rc->backoff_deadline = TIME_MIN;
-    rc->last_received = time(0);
-    rc->last_connected = time(0);
+    rc->last_received = time_now();
+    rc->last_connected = time_now();
 
     rc->packets_sent = 0;
 
     rc->questionable_connectivity = false;
-    rc->last_questioned = time(0);
+    rc->last_questioned = time_now();
 
     rc->probe_interval = probe_interval ? MAX(5, probe_interval) : 0;
 
@@ -207,7 +201,7 @@ rconn_connect_unreliably(struct rconn *rc,
     rc->name = xstrdup(name);
     rc->reliable = false;
     rc->vconn = vconn;
-    rc->last_connected = time(0);
+    rc->last_connected = time_now();
     state_transition(rc, S_ACTIVE);
 }
 
@@ -235,6 +229,7 @@ rconn_destroy(struct rconn *rc)
     if (rc) {
         free(rc->name);
         vconn_close(rc->vconn);
+        flush_queue(rc);
         queue_destroy(&rc->txq);
         free(rc);
     }
@@ -260,7 +255,7 @@ reconnect(struct rconn *rc)
     VLOG_WARN("%s: connecting...", rc->name);
     retval = vconn_open(rc->name, &rc->vconn);
     if (!retval) {
-        rc->backoff_deadline = time(0) + rc->backoff;
+        rc->backoff_deadline = time_now() + rc->backoff;
         state_transition(rc, S_CONNECTING);
     } else {
         VLOG_WARN("%s: connection failed (%s)", rc->name, strerror(retval));
@@ -341,7 +336,7 @@ run_ACTIVE(struct rconn *rc)
         unsigned int base = MAX(rc->last_received, rc->state_entered);
         queue_push_tail(&rc->txq, make_echo_request());
         VLOG_DBG("%s: idle %u seconds, sending inactivity probe",
-                 rc->name, (unsigned int) (time(0) - base));
+                 rc->name, (unsigned int) (time_now() - base));
         state_transition(rc, S_IDLE);
         return;
     }
@@ -413,7 +408,7 @@ rconn_recv(struct rconn *rc)
         struct buffer *buffer;
         int error = vconn_recv(rc->vconn, &buffer);
         if (!error) {
-            rc->last_received = time(0);
+            rc->last_received = time_now();
             if (rc->state == S_IDLE) {
                 state_transition(rc, S_ACTIVE);
             }
@@ -435,55 +430,59 @@ rconn_recv_wait(struct rconn *rc)
     }
 }
 
-/* Sends 'b' on 'rc'.  Returns 0 if successful, EAGAIN if at least 'txq_limit'
- * packets are already queued, otherwise a positive errno value. */
+/* Sends 'b' on 'rc'.  Returns 0 if successful (in which case 'b' is
+ * destroyed), or ENOTCONN if 'rc' is not currently connected (in which case
+ * the caller retains ownership of 'b').
+ *
+ * If 'n_queued' is non-null, then '*n_queued' will be incremented while the
+ * packet is in flight, then decremented when it has been sent (or discarded
+ * due to disconnection).  Because 'b' may be sent (or discarded) before this
+ * function returns, the caller may not be able to observe any change in
+ * '*n_queued'.
+ *
+ * There is no rconn_send_wait() function: an rconn has a send queue that it
+ * takes care of sending if you call rconn_run(), which will have the side
+ * effect of waking up poll_block(). */
 int
-do_send(struct rconn *rc, struct buffer *b, int txq_limit)
+rconn_send(struct rconn *rc, struct buffer *b, int *n_queued)
 {
     if (rc->vconn) {
-        if (rc->txq.n < txq_limit) {
-            queue_push_tail(&rc->txq, b);
-            if (rc->txq.n == 1) {
-                try_send(rc);
-            }
-            return 0;
-        } else {
-            return EAGAIN;
+        b->private = n_queued;
+        if (n_queued) {
+            ++*n_queued;
         }
+        queue_push_tail(&rc->txq, b);
+        if (rc->txq.n == 1) {
+            try_send(rc);
+        }
+        return 0;
     } else {
         return ENOTCONN;
     }
 }
 
-/* Sends 'b' on 'rc'.  Returns 0 if successful, EAGAIN if the send queue is
- * full, or ENOTCONN if 'rc' is not currently connected.
+/* Sends 'b' on 'rc'.  Increments '*n_queued' while the packet is in flight; it
+ * will be decremented when it has been sent (or discarded due to
+ * disconnection).  Returns 0 if successful, EAGAIN if '*n_queued' is already
+ * at least as large of 'queue_limit', or ENOTCONN if 'rc' is not currently
+ * connected.  Regardless of return value, 'b' is destroyed.
+ *
+ * Because 'b' may be sent (or discarded) before this function returns, the
+ * caller may not be able to observe any change in '*n_queued'.
  *
  * There is no rconn_send_wait() function: an rconn has a send queue that it
  * takes care of sending if you call rconn_run(), which will have the side
  * effect of waking up poll_block(). */
 int
-rconn_send(struct rconn *rc, struct buffer *b)
-{
-    return do_send(rc, b, rc->txq_limit);
-}
-
-/* Sends 'b' on 'rc'.  Returns 0 if successful, EAGAIN if the send queue is
- * full, otherwise a positive errno value.
- *
- * Compared to rconn_send(), this function relaxes the queue limit, allowing
- * more packets than usual to be queued. */
-int
-rconn_force_send(struct rconn *rc, struct buffer *b)
+rconn_send_with_limit(struct rconn *rc, struct buffer *b,
+                      int *n_queued, int queue_limit)
 {
-    return do_send(rc, b, 2 * rc->txq_limit);
-}
-
-/* Returns true if 'rc''s send buffer is full,
- * false if it has room for at least one more packet. */
-bool
-rconn_is_full(const struct rconn *rc)
-{
-    return rc->txq.n >= rc->txq_limit;
+    int retval;
+    retval = *n_queued >= queue_limit ? EAGAIN : rconn_send(rc, b, n_queued);
+    if (retval) {
+        buffer_delete(b);
+    }
+    return retval;
 }
 
 /* Returns the total number of packets successfully sent on the underlying
@@ -522,7 +521,7 @@ rconn_is_connected(const struct rconn *rconn)
 int
 rconn_disconnected_duration(const struct rconn *rconn)
 {
-    return rconn_is_connected(rconn) ? 0 : time(0) - rconn->last_received;
+    return rconn_is_connected(rconn) ? 0 : time_now() - rconn->last_received;
 }
 
 /* Returns the IP address of the peer, or 0 if the peer is not connected over
@@ -556,6 +555,7 @@ try_send(struct rconn *rc)
 {
     int retval = 0;
     struct buffer *next = rc->txq.head->next;
+    int *n_queued = rc->txq.head->private;
     retval = vconn_send(rc->vconn, rc->txq.head);
     if (retval) {
         if (retval != EAGAIN) {
@@ -564,6 +564,9 @@ try_send(struct rconn *rc)
         return retval;
     }
     rc->packets_sent++;
+    if (n_queued) {
+        --*n_queued;
+    }
     queue_advance_head(&rc->txq, next);
     return 0;
 }
@@ -575,7 +578,7 @@ static void
 disconnect(struct rconn *rc, int error)
 {
     if (rc->reliable) {
-        time_t now = time(0);
+        time_t now = time_now();
 
         if (rc->state & (S_CONNECTING | S_ACTIVE | S_IDLE)) {
             if (error > 0) {
@@ -590,7 +593,7 @@ disconnect(struct rconn *rc, int error)
             }
             vconn_close(rc->vconn);
             rc->vconn = NULL;
-            queue_clear(&rc->txq);
+            flush_queue(rc);
         }
 
         if (now >= rc->backoff_deadline) {
@@ -610,10 +613,25 @@ disconnect(struct rconn *rc, int error)
     }
 }
 
+/* Drops all the packets from 'rc''s send queue and decrements their queue
+ * counts. */
+static void
+flush_queue(struct rconn *rc)
+{
+    while (rc->txq.n > 0) {
+        struct buffer *b = queue_pop_head(&rc->txq);
+        int *n_queued = b->private;
+        if (n_queued) {
+            --*n_queued;
+        }
+        buffer_delete(b);
+    }
+}
+
 static unsigned int
 elapsed_in_this_state(const struct rconn *rc)
 {
-    return time(0) - rc->state_entered;
+    return time_now() - rc->state_entered;
 }
 
 static unsigned int
@@ -631,7 +649,7 @@ timeout(const struct rconn *rc)
 static bool
 timed_out(const struct rconn *rc)
 {
-    return time(0) >= sat_add(rc->state_entered, timeout(rc));
+    return time_now() >= sat_add(rc->state_entered, timeout(rc));
 }
 
 static void
@@ -639,7 +657,7 @@ state_transition(struct rconn *rc, enum state state)
 {
     VLOG_DBG("%s: entering %s", rc->name, state_name(state));
     rc->state = state;
-    rc->state_entered = time(0);
+    rc->state_entered = time_now();
 }
 
 static unsigned int
@@ -658,7 +676,7 @@ sat_mul(unsigned int x, unsigned int y)
 static void
 question_connectivity(struct rconn *rc) 
 {
-    time_t now = time(0);
+    time_t now = time_now();
     if (now - rc->last_questioned > 60) {
         rc->questionable_connectivity = true;
         rc->last_questioned = now;