Drop rconn's responsibility for limiting the tx queue.
[sliver-openvswitch.git] / lib / rconn.c
index 7f5a1dc..6bc371a 100644 (file)
@@ -31,6 +31,7 @@
  * derivatives without specific, written prior permission.
  */
 
+#include <config.h>
 #include "rconn.h"
 #include <assert.h>
 #include <errno.h>
@@ -76,14 +77,12 @@ state_name(enum state state)
 struct rconn {
     enum state state;
     time_t state_entered;
-    unsigned int min_timeout;
 
     struct vconn *vconn;
     char *name;
     bool reliable;
 
     struct queue txq;
-    int txq_limit;
 
     int backoff;
     int max_backoff;
@@ -113,36 +112,34 @@ struct rconn {
 static unsigned int sat_add(unsigned int x, unsigned int y);
 static unsigned int sat_mul(unsigned int x, unsigned int y);
 static unsigned int elapsed_in_this_state(const struct rconn *);
-static bool timeout(struct rconn *, unsigned int secs);
+static unsigned int timeout(const struct rconn *);
+static bool timed_out(const struct rconn *);
 static void state_transition(struct rconn *, enum state);
 static int try_send(struct rconn *);
-static void reconnect(struct rconn *);
+static int reconnect(struct rconn *);
 static void disconnect(struct rconn *, int error);
+static void flush_queue(struct rconn *);
 static void question_connectivity(struct rconn *);
 
 /* Creates a new rconn, connects it (reliably) to 'name', and returns it. */
 struct rconn *
-rconn_new(const char *name, int txq_limit, int inactivity_probe_interval,
-          int max_backoff) 
+rconn_new(const char *name, int inactivity_probe_interval, int max_backoff)
 {
-    struct rconn *rc = rconn_create(txq_limit, inactivity_probe_interval,
-                                    max_backoff);
+    struct rconn *rc = rconn_create(inactivity_probe_interval, max_backoff);
     rconn_connect(rc, name);
     return rc;
 }
 
 /* Creates a new rconn, connects it (unreliably) to 'vconn', and returns it. */
 struct rconn *
-rconn_new_from_vconn(const char *name, int txq_limit, struct vconn *vconn) 
+rconn_new_from_vconn(const char *name, struct vconn *vconn) 
 {
-    struct rconn *rc = rconn_create(txq_limit, 60, 0);
+    struct rconn *rc = rconn_create(60, 0);
     rconn_connect_unreliably(rc, name, vconn);
     return rc;
 }
 
 /* Creates and returns a new rconn.
- *
- * 'txq_limit' is the maximum length of the send queue, in packets.
  *
  * 'probe_interval' is a number of seconds.  If the interval passes once
  * without an OpenFlow message being received from the peer, the rconn sends
@@ -155,46 +152,43 @@ rconn_new_from_vconn(const char *name, int txq_limit, struct vconn *vconn)
  * failure until it reaches 'max_backoff'.  If 0 is specified, the default of
  * 60 seconds is used. */
 struct rconn *
-rconn_create(int txq_limit, int probe_interval, int max_backoff)
+rconn_create(int probe_interval, int max_backoff)
 {
     struct rconn *rc = xcalloc(1, sizeof *rc);
 
     rc->state = S_VOID;
-    rc->state_entered = time(0);
-    rc->min_timeout = 0;
+    rc->state_entered = time_now();
 
     rc->vconn = NULL;
     rc->name = xstrdup("void");
     rc->reliable = false;
 
     queue_init(&rc->txq);
-    assert(txq_limit > 0);
-    rc->txq_limit = txq_limit;
 
     rc->backoff = 0;
     rc->max_backoff = max_backoff ? max_backoff : 60;
     rc->backoff_deadline = TIME_MIN;
-    rc->last_received = time(0);
-    rc->last_connected = time(0);
+    rc->last_received = time_now();
+    rc->last_connected = time_now();
 
     rc->packets_sent = 0;
 
     rc->questionable_connectivity = false;
-    rc->last_questioned = time(0);
+    rc->last_questioned = time_now();
 
     rc->probe_interval = probe_interval ? MAX(5, probe_interval) : 0;
 
     return rc;
 }
 
-void
+int
 rconn_connect(struct rconn *rc, const char *name)
 {
     rconn_disconnect(rc);
     free(rc->name);
     rc->name = xstrdup(name);
     rc->reliable = true;
-    reconnect(rc);
+    return reconnect(rc);
 }
 
 void
@@ -207,7 +201,7 @@ rconn_connect_unreliably(struct rconn *rc,
     rc->name = xstrdup(name);
     rc->reliable = false;
     rc->vconn = vconn;
-    rc->last_connected = time(0);
+    rc->last_connected = time_now();
     state_transition(rc, S_ACTIVE);
 }
 
@@ -235,18 +229,25 @@ rconn_destroy(struct rconn *rc)
     if (rc) {
         free(rc->name);
         vconn_close(rc->vconn);
+        flush_queue(rc);
         queue_destroy(&rc->txq);
         free(rc);
     }
 }
 
+static unsigned int
+timeout_VOID(const struct rconn *rc)
+{
+    return UINT_MAX;
+}
+
 static void
 run_VOID(struct rconn *rc)
 {
     /* Nothing to do. */
 }
 
-static void
+static int
 reconnect(struct rconn *rc)
 {
     int retval;
@@ -254,38 +255,52 @@ reconnect(struct rconn *rc)
     VLOG_WARN("%s: connecting...", rc->name);
     retval = vconn_open(rc->name, &rc->vconn);
     if (!retval) {
-        rc->backoff_deadline = time(0) + rc->backoff;
+        rc->backoff_deadline = time_now() + rc->backoff;
         state_transition(rc, S_CONNECTING);
     } else {
         VLOG_WARN("%s: connection failed (%s)", rc->name, strerror(retval));
         disconnect(rc, 0);
     }
+    return retval;
+}
+
+static unsigned int
+timeout_BACKOFF(const struct rconn *rc)
+{
+    return rc->backoff;
 }
 
 static void
 run_BACKOFF(struct rconn *rc)
 {
-    if (timeout(rc, rc->backoff)) {
+    if (timed_out(rc)) {
         reconnect(rc);
     }
 }
 
+static unsigned int
+timeout_CONNECTING(const struct rconn *rc)
+{
+    return MAX(1, rc->backoff);
+}
+
 static void
 run_CONNECTING(struct rconn *rc)
 {
-    int error = vconn_connect(rc->vconn);
-    if (!error) {
+    int retval = vconn_connect(rc->vconn);
+    if (!retval) {
         VLOG_WARN("%s: connected", rc->name);
         if (vconn_is_passive(rc->vconn)) {
-            fatal(0, "%s: passive vconn not supported in switch",
-                  rc->name);
+            error(0, "%s: passive vconn not supported", rc->name);
+            state_transition(rc, S_VOID);
+        } else {
+            state_transition(rc, S_ACTIVE);
+            rc->last_connected = rc->state_entered;
         }
-        state_transition(rc, S_ACTIVE);
-        rc->last_connected = rc->state_entered;
-    } else if (error != EAGAIN) {
-        VLOG_WARN("%s: connection failed (%s)", rc->name, strerror(error));
-        disconnect(rc, error);
-    } else if (timeout(rc, MAX(1, rc->backoff))) {
+    } else if (retval != EAGAIN) {
+        VLOG_WARN("%s: connection failed (%s)", rc->name, strerror(retval));
+        disconnect(rc, retval);
+    } else if (timed_out(rc)) {
         VLOG_WARN("%s: connection timed out", rc->name);
         rc->backoff_deadline = TIME_MAX; /* Prevent resetting backoff. */
         disconnect(rc, 0);
@@ -303,28 +318,42 @@ do_tx_work(struct rconn *rc)
     }
 }
 
-static void
-run_ACTIVE(struct rconn *rc)
+static unsigned int
+timeout_ACTIVE(const struct rconn *rc)
 {
     if (rc->probe_interval) {
         unsigned int base = MAX(rc->last_received, rc->state_entered);
         unsigned int arg = base + rc->probe_interval - rc->state_entered;
-        if (timeout(rc, arg)) {
-            queue_push_tail(&rc->txq, make_echo_request());
-            VLOG_DBG("%s: idle %u seconds, sending inactivity probe",
-                     rc->name, (unsigned int) (time(0) - base));
-            state_transition(rc, S_IDLE);
-            return;
-        }
+        return arg;
+    }
+    return UINT_MAX;
+}
+
+static void
+run_ACTIVE(struct rconn *rc)
+{
+    if (timed_out(rc)) {
+        unsigned int base = MAX(rc->last_received, rc->state_entered);
+        queue_push_tail(&rc->txq, make_echo_request());
+        VLOG_DBG("%s: idle %u seconds, sending inactivity probe",
+                 rc->name, (unsigned int) (time_now() - base));
+        state_transition(rc, S_IDLE);
+        return;
     }
 
     do_tx_work(rc);
 }
 
+static unsigned int
+timeout_IDLE(const struct rconn *rc)
+{
+    return rc->probe_interval;
+}
+
 static void
 run_IDLE(struct rconn *rc)
 {
-    if (timeout(rc, rc->probe_interval)) {
+    if (timed_out(rc)) {
         question_connectivity(rc);
         VLOG_ERR("%s: no response to inactivity probe after %u "
                  "seconds, disconnecting",
@@ -344,7 +373,6 @@ rconn_run(struct rconn *rc)
     int old_state;
     do {
         old_state = rc->state;
-        rc->min_timeout = UINT_MAX;
         switch (rc->state) {
 #define STATE(NAME, VALUE) case S_##NAME: run_##NAME(rc); break;
             STATES
@@ -360,14 +388,10 @@ rconn_run(struct rconn *rc)
 void
 rconn_run_wait(struct rconn *rc)
 {
-    if (rc->min_timeout != UINT_MAX) {
-        poll_timer_wait(sat_mul(rc->min_timeout, 1000));
+    unsigned int timeo = timeout(rc);
+    if (timeo != UINT_MAX) {
+        poll_timer_wait(sat_mul(timeo, 1000));
     }
-    /* Reset timeout to 1 second.  This will have no effect ordinarily, because
-     * rconn_run() will typically set it back to a higher value.  If, however,
-     * the caller fails to call rconn_run() before its next call to
-     * rconn_wait() we won't potentially block forever. */
-    rc->min_timeout = 1;
 
     if ((rc->state & (S_ACTIVE | S_IDLE)) && rc->txq.n) {
         vconn_wait(rc->vconn, WAIT_SEND);
@@ -384,7 +408,7 @@ rconn_recv(struct rconn *rc)
         struct buffer *buffer;
         int error = vconn_recv(rc->vconn, &buffer);
         if (!error) {
-            rc->last_received = time(0);
+            rc->last_received = time_now();
             if (rc->state == S_IDLE) {
                 state_transition(rc, S_ACTIVE);
             }
@@ -406,55 +430,59 @@ rconn_recv_wait(struct rconn *rc)
     }
 }
 
-/* Sends 'b' on 'rc'.  Returns 0 if successful, EAGAIN if at least 'txq_limit'
- * packets are already queued, otherwise a positive errno value. */
+/* Sends 'b' on 'rc'.  Returns 0 if successful (in which case 'b' is
+ * destroyed), or ENOTCONN if 'rc' is not currently connected (in which case
+ * the caller retains ownership of 'b').
+ *
+ * If 'n_queued' is non-null, then '*n_queued' will be incremented while the
+ * packet is in flight, then decremented when it has been sent (or discarded
+ * due to disconnection).  Because 'b' may be sent (or discarded) before this
+ * function returns, the caller may not be able to observe any change in
+ * '*n_queued'.
+ *
+ * There is no rconn_send_wait() function: an rconn has a send queue that it
+ * takes care of sending if you call rconn_run(), which will have the side
+ * effect of waking up poll_block(). */
 int
-do_send(struct rconn *rc, struct buffer *b, int txq_limit)
+rconn_send(struct rconn *rc, struct buffer *b, int *n_queued)
 {
     if (rc->vconn) {
-        if (rc->txq.n < txq_limit) {
-            queue_push_tail(&rc->txq, b);
-            if (rc->txq.n == 1) {
-                try_send(rc);
-            }
-            return 0;
-        } else {
-            return EAGAIN;
+        b->private = n_queued;
+        if (n_queued) {
+            ++*n_queued;
+        }
+        queue_push_tail(&rc->txq, b);
+        if (rc->txq.n == 1) {
+            try_send(rc);
         }
+        return 0;
     } else {
         return ENOTCONN;
     }
 }
 
-/* Sends 'b' on 'rc'.  Returns 0 if successful, EAGAIN if the send queue is
- * full, or ENOTCONN if 'rc' is not currently connected.
+/* Sends 'b' on 'rc'.  Increments '*n_queued' while the packet is in flight; it
+ * will be decremented when it has been sent (or discarded due to
+ * disconnection).  Returns 0 if successful, EAGAIN if '*n_queued' is already
+ * at least as large of 'queue_limit', or ENOTCONN if 'rc' is not currently
+ * connected.  Regardless of return value, 'b' is destroyed.
+ *
+ * Because 'b' may be sent (or discarded) before this function returns, the
+ * caller may not be able to observe any change in '*n_queued'.
  *
  * There is no rconn_send_wait() function: an rconn has a send queue that it
  * takes care of sending if you call rconn_run(), which will have the side
  * effect of waking up poll_block(). */
 int
-rconn_send(struct rconn *rc, struct buffer *b)
-{
-    return do_send(rc, b, rc->txq_limit);
-}
-
-/* Sends 'b' on 'rc'.  Returns 0 if successful, EAGAIN if the send queue is
- * full, otherwise a positive errno value.
- *
- * Compared to rconn_send(), this function relaxes the queue limit, allowing
- * more packets than usual to be queued. */
-int
-rconn_force_send(struct rconn *rc, struct buffer *b)
+rconn_send_with_limit(struct rconn *rc, struct buffer *b,
+                      int *n_queued, int queue_limit)
 {
-    return do_send(rc, b, 2 * rc->txq_limit);
-}
-
-/* Returns true if 'rc''s send buffer is full,
- * false if it has room for at least one more packet. */
-bool
-rconn_is_full(const struct rconn *rc)
-{
-    return rc->txq.n >= rc->txq_limit;
+    int retval;
+    retval = *n_queued >= queue_limit ? EAGAIN : rconn_send(rc, b, n_queued);
+    if (retval) {
+        buffer_delete(b);
+    }
+    return retval;
 }
 
 /* Returns the total number of packets successfully sent on the underlying
@@ -493,7 +521,7 @@ rconn_is_connected(const struct rconn *rconn)
 int
 rconn_disconnected_duration(const struct rconn *rconn)
 {
-    return rconn_is_connected(rconn) ? 0 : time(0) - rconn->last_connected;
+    return rconn_is_connected(rconn) ? 0 : time_now() - rconn->last_received;
 }
 
 /* Returns the IP address of the peer, or 0 if the peer is not connected over
@@ -527,6 +555,7 @@ try_send(struct rconn *rc)
 {
     int retval = 0;
     struct buffer *next = rc->txq.head->next;
+    int *n_queued = rc->txq.head->private;
     retval = vconn_send(rc->vconn, rc->txq.head);
     if (retval) {
         if (retval != EAGAIN) {
@@ -535,6 +564,9 @@ try_send(struct rconn *rc)
         return retval;
     }
     rc->packets_sent++;
+    if (n_queued) {
+        --*n_queued;
+    }
     queue_advance_head(&rc->txq, next);
     return 0;
 }
@@ -546,7 +578,7 @@ static void
 disconnect(struct rconn *rc, int error)
 {
     if (rc->reliable) {
-        time_t now = time(0);
+        time_t now = time_now();
 
         if (rc->state & (S_CONNECTING | S_ACTIVE | S_IDLE)) {
             if (error > 0) {
@@ -561,7 +593,7 @@ disconnect(struct rconn *rc, int error)
             }
             vconn_close(rc->vconn);
             rc->vconn = NULL;
-            queue_clear(&rc->txq);
+            flush_queue(rc);
         }
 
         if (now >= rc->backoff_deadline) {
@@ -581,17 +613,43 @@ disconnect(struct rconn *rc, int error)
     }
 }
 
+/* Drops all the packets from 'rc''s send queue and decrements their queue
+ * counts. */
+static void
+flush_queue(struct rconn *rc)
+{
+    while (rc->txq.n > 0) {
+        struct buffer *b = queue_pop_head(&rc->txq);
+        int *n_queued = b->private;
+        if (n_queued) {
+            --*n_queued;
+        }
+        buffer_delete(b);
+    }
+}
+
 static unsigned int
 elapsed_in_this_state(const struct rconn *rc)
 {
-    return time(0) - rc->state_entered;
+    return time_now() - rc->state_entered;
+}
+
+static unsigned int
+timeout(const struct rconn *rc)
+{
+    switch (rc->state) {
+#define STATE(NAME, VALUE) case S_##NAME: return timeout_##NAME(rc);
+        STATES
+#undef STATE
+    default:
+        NOT_REACHED();
+    }
 }
 
 static bool
-timeout(struct rconn *rc, unsigned int secs)
+timed_out(const struct rconn *rc)
 {
-    rc->min_timeout = MIN(rc->min_timeout, secs);
-    return time(0) >= sat_add(rc->state_entered, secs);
+    return time_now() >= sat_add(rc->state_entered, timeout(rc));
 }
 
 static void
@@ -599,7 +657,7 @@ state_transition(struct rconn *rc, enum state state)
 {
     VLOG_DBG("%s: entering %s", rc->name, state_name(state));
     rc->state = state;
-    rc->state_entered = time(0);
+    rc->state_entered = time_now();
 }
 
 static unsigned int
@@ -618,7 +676,7 @@ sat_mul(unsigned int x, unsigned int y)
 static void
 question_connectivity(struct rconn *rc) 
 {
-    time_t now = time(0);
+    time_t now = time_now();
     if (now - rc->last_questioned > 60) {
         rc->questionable_connectivity = true;
         rc->last_questioned = now;