Drop rconn's responsibility for limiting the tx queue.
authorBen Pfaff <blp@nicira.com>
Mon, 11 Aug 2008 23:24:24 +0000 (16:24 -0700)
committerBen Pfaff <blp@nicira.com>
Tue, 12 Aug 2008 22:21:30 +0000 (15:21 -0700)
Now it helps clients track the number of in-flight messages, but lets the
clients do the limiting themselves.  This will come in handy for packet-in
rate limiting (in an upcoming commit), in which we want to track in-flight
packet-in messages separately from other in-flight messages.

controller/controller.c
include/buffer.h
include/queue.h
include/rconn.h
lib/buffer.c
lib/learning-switch.c
lib/queue.c
lib/rconn.c
secchan/secchan.c
switch/datapath.c
switch/switch.c

index 2176fa7..bc18748 100644 (file)
@@ -197,7 +197,7 @@ main(int argc, char *argv[])
 static void
 new_switch(struct switch_ *sw, struct vconn *vconn, const char *name)
 {
-    sw->rconn = rconn_new_from_vconn(name, 128, vconn);
+    sw->rconn = rconn_new_from_vconn(name, vconn);
     sw->lswitch = lswitch_create(sw->rconn, learn_macs,
                                  setup_flows ? max_idle : -1);
 }
index e171800..0868069 100644 (file)
@@ -51,6 +51,7 @@ struct buffer {
     void *l7;                   /* Application data. */
 
     struct buffer *next;        /* Next in a list of buffers. */
+    void *private;              /* Private pointer for use by owner. */
 };
 
 void buffer_use(struct buffer *, void *, size_t);
index c2d0f22..113d501 100644 (file)
@@ -46,5 +46,6 @@ void queue_destroy(struct queue *);
 void queue_clear(struct queue *);
 void queue_advance_head(struct queue *, struct buffer *next);
 void queue_push_tail(struct queue *, struct buffer *);
+struct buffer *queue_pop_head(struct queue *);
 
 #endif /* queue.h */
index 228b8e8..26e4be7 100644 (file)
 
 struct vconn;
 
-struct rconn *rconn_new(const char *name, int txq_limit,
+struct rconn *rconn_new(const char *name, 
                         int inactivity_probe_interval, int max_backoff);
-struct rconn *rconn_new_from_vconn(const char *name, int txq_limit,
-                                   struct vconn *);
-struct rconn *rconn_create(int txq_limit, int inactivity_probe_interval,
-                           int max_backoff);
+struct rconn *rconn_new_from_vconn(const char *name, struct vconn *);
+struct rconn *rconn_create(int inactivity_probe_interval, int max_backoff);
 int rconn_connect(struct rconn *, const char *name);
 void rconn_connect_unreliably(struct rconn *,
                               const char *name, struct vconn *vconn);
@@ -69,9 +67,9 @@ void rconn_run(struct rconn *);
 void rconn_run_wait(struct rconn *);
 struct buffer *rconn_recv(struct rconn *);
 void rconn_recv_wait(struct rconn *);
-int rconn_send(struct rconn *, struct buffer *);
-int rconn_force_send(struct rconn *, struct buffer *);
-bool rconn_is_full(const struct rconn *);
+int rconn_send(struct rconn *, struct buffer *, int *n_queued);
+int rconn_send_with_limit(struct rconn *, struct buffer *,
+                          int *n_queued, int queue_limit);
 unsigned int rconn_packets_sent(const struct rconn *);
 
 const char *rconn_get_name(const struct rconn *);
index 3dfd41a..47600e6 100644 (file)
@@ -53,6 +53,7 @@ buffer_use(struct buffer *b, void *base, size_t allocated)
     b->size = 0;
     b->l2 = b->l3 = b->l4 = b->l7 = NULL;
     b->next = NULL;
+    b->private = NULL;
 }
 
 /* Initializes 'b' as a buffer with an initial capacity of 'size' bytes. */
index d41602d..27271fa 100644 (file)
@@ -63,6 +63,9 @@ struct lswitch {
     uint64_t datapath_id;
     time_t last_features_request;
     struct mac_learning *ml;    /* NULL to act as hub instead of switch. */
+
+    /* Number of outgoing queued packets on the rconn. */
+    int n_queued;
 };
 
 static void queue_tx(struct lswitch *, struct rconn *, struct buffer *);
@@ -85,8 +88,7 @@ static void process_echo_request(struct lswitch *, struct rconn *,
 struct lswitch *
 lswitch_create(struct rconn *rconn, bool learn_macs, int max_idle)
 {
-    struct lswitch *sw = xmalloc(sizeof *sw);
-    memset(sw, 0, sizeof *sw);
+    struct lswitch *sw = xcalloc(1, sizeof *sw);
     sw->max_idle = max_idle;
     sw->datapath_id = 0;
     sw->last_features_request = time_now() - 1;
@@ -182,7 +184,7 @@ send_features_request(struct lswitch *sw, struct rconn *rconn)
 static void
 queue_tx(struct lswitch *sw, struct rconn *rconn, struct buffer *b)
 {
-    int retval = rconn_send(rconn, b);
+    int retval = rconn_send_with_limit(rconn, b, &sw->n_queued, 10);
     if (retval) {
         if (retval == EAGAIN) {
             /* FIXME: ratelimit. */
@@ -193,7 +195,6 @@ queue_tx(struct lswitch *sw, struct rconn *rconn, struct buffer *b)
             /* FIXME: ratelimit. */
             VLOG_WARN("%s: send: %s", rconn_get_name(rconn), strerror(retval));
         }
-        buffer_delete(b);
     }
 }
 
index 635e144..f06c8e9 100644 (file)
@@ -101,6 +101,17 @@ queue_push_tail(struct queue *q, struct buffer *b)
     check_queue(q);
 }
 
+/* Removes the first buffer from 'q', which must not be empty, and returns
+ * it.  The caller must free the buffer (with buffer_delete()) when it is no
+ * longer needed. */
+struct buffer *
+queue_pop_head(struct queue *q)
+{
+    struct buffer *head = q->head;
+    queue_advance_head(q, head->next);
+    return head;
+}
+
 /* Checks the internal integrity of 'q'.  For use in debugging. */
 static void
 check_queue(struct queue *q)
index 0093c3a..6bc371a 100644 (file)
@@ -83,7 +83,6 @@ struct rconn {
     bool reliable;
 
     struct queue txq;
-    int txq_limit;
 
     int backoff;
     int max_backoff;
@@ -119,31 +118,28 @@ static void state_transition(struct rconn *, enum state);
 static int try_send(struct rconn *);
 static int reconnect(struct rconn *);
 static void disconnect(struct rconn *, int error);
+static void flush_queue(struct rconn *);
 static void question_connectivity(struct rconn *);
 
 /* Creates a new rconn, connects it (reliably) to 'name', and returns it. */
 struct rconn *
-rconn_new(const char *name, int txq_limit, int inactivity_probe_interval,
-          int max_backoff) 
+rconn_new(const char *name, int inactivity_probe_interval, int max_backoff)
 {
-    struct rconn *rc = rconn_create(txq_limit, inactivity_probe_interval,
-                                    max_backoff);
+    struct rconn *rc = rconn_create(inactivity_probe_interval, max_backoff);
     rconn_connect(rc, name);
     return rc;
 }
 
 /* Creates a new rconn, connects it (unreliably) to 'vconn', and returns it. */
 struct rconn *
-rconn_new_from_vconn(const char *name, int txq_limit, struct vconn *vconn) 
+rconn_new_from_vconn(const char *name, struct vconn *vconn) 
 {
-    struct rconn *rc = rconn_create(txq_limit, 60, 0);
+    struct rconn *rc = rconn_create(60, 0);
     rconn_connect_unreliably(rc, name, vconn);
     return rc;
 }
 
 /* Creates and returns a new rconn.
- *
- * 'txq_limit' is the maximum length of the send queue, in packets.
  *
  * 'probe_interval' is a number of seconds.  If the interval passes once
  * without an OpenFlow message being received from the peer, the rconn sends
@@ -156,7 +152,7 @@ rconn_new_from_vconn(const char *name, int txq_limit, struct vconn *vconn)
  * failure until it reaches 'max_backoff'.  If 0 is specified, the default of
  * 60 seconds is used. */
 struct rconn *
-rconn_create(int txq_limit, int probe_interval, int max_backoff)
+rconn_create(int probe_interval, int max_backoff)
 {
     struct rconn *rc = xcalloc(1, sizeof *rc);
 
@@ -168,8 +164,6 @@ rconn_create(int txq_limit, int probe_interval, int max_backoff)
     rc->reliable = false;
 
     queue_init(&rc->txq);
-    assert(txq_limit > 0);
-    rc->txq_limit = txq_limit;
 
     rc->backoff = 0;
     rc->max_backoff = max_backoff ? max_backoff : 60;
@@ -235,6 +229,7 @@ rconn_destroy(struct rconn *rc)
     if (rc) {
         free(rc->name);
         vconn_close(rc->vconn);
+        flush_queue(rc);
         queue_destroy(&rc->txq);
         free(rc);
     }
@@ -435,55 +430,59 @@ rconn_recv_wait(struct rconn *rc)
     }
 }
 
-/* Sends 'b' on 'rc'.  Returns 0 if successful, EAGAIN if at least 'txq_limit'
- * packets are already queued, otherwise a positive errno value. */
+/* Sends 'b' on 'rc'.  Returns 0 if successful (in which case 'b' is
+ * destroyed), or ENOTCONN if 'rc' is not currently connected (in which case
+ * the caller retains ownership of 'b').
+ *
+ * If 'n_queued' is non-null, then '*n_queued' will be incremented while the
+ * packet is in flight, then decremented when it has been sent (or discarded
+ * due to disconnection).  Because 'b' may be sent (or discarded) before this
+ * function returns, the caller may not be able to observe any change in
+ * '*n_queued'.
+ *
+ * There is no rconn_send_wait() function: an rconn has a send queue that it
+ * takes care of sending if you call rconn_run(), which will have the side
+ * effect of waking up poll_block(). */
 int
-do_send(struct rconn *rc, struct buffer *b, int txq_limit)
+rconn_send(struct rconn *rc, struct buffer *b, int *n_queued)
 {
     if (rc->vconn) {
-        if (rc->txq.n < txq_limit) {
-            queue_push_tail(&rc->txq, b);
-            if (rc->txq.n == 1) {
-                try_send(rc);
-            }
-            return 0;
-        } else {
-            return EAGAIN;
+        b->private = n_queued;
+        if (n_queued) {
+            ++*n_queued;
         }
+        queue_push_tail(&rc->txq, b);
+        if (rc->txq.n == 1) {
+            try_send(rc);
+        }
+        return 0;
     } else {
         return ENOTCONN;
     }
 }
 
-/* Sends 'b' on 'rc'.  Returns 0 if successful, EAGAIN if the send queue is
- * full, or ENOTCONN if 'rc' is not currently connected.
+/* Sends 'b' on 'rc'.  Increments '*n_queued' while the packet is in flight; it
+ * will be decremented when it has been sent (or discarded due to
+ * disconnection).  Returns 0 if successful, EAGAIN if '*n_queued' is already
+ * at least as large of 'queue_limit', or ENOTCONN if 'rc' is not currently
+ * connected.  Regardless of return value, 'b' is destroyed.
+ *
+ * Because 'b' may be sent (or discarded) before this function returns, the
+ * caller may not be able to observe any change in '*n_queued'.
  *
  * There is no rconn_send_wait() function: an rconn has a send queue that it
  * takes care of sending if you call rconn_run(), which will have the side
  * effect of waking up poll_block(). */
 int
-rconn_send(struct rconn *rc, struct buffer *b)
-{
-    return do_send(rc, b, rc->txq_limit);
-}
-
-/* Sends 'b' on 'rc'.  Returns 0 if successful, EAGAIN if the send queue is
- * full, otherwise a positive errno value.
- *
- * Compared to rconn_send(), this function relaxes the queue limit, allowing
- * more packets than usual to be queued. */
-int
-rconn_force_send(struct rconn *rc, struct buffer *b)
+rconn_send_with_limit(struct rconn *rc, struct buffer *b,
+                      int *n_queued, int queue_limit)
 {
-    return do_send(rc, b, 2 * rc->txq_limit);
-}
-
-/* Returns true if 'rc''s send buffer is full,
- * false if it has room for at least one more packet. */
-bool
-rconn_is_full(const struct rconn *rc)
-{
-    return rc->txq.n >= rc->txq_limit;
+    int retval;
+    retval = *n_queued >= queue_limit ? EAGAIN : rconn_send(rc, b, n_queued);
+    if (retval) {
+        buffer_delete(b);
+    }
+    return retval;
 }
 
 /* Returns the total number of packets successfully sent on the underlying
@@ -556,6 +555,7 @@ try_send(struct rconn *rc)
 {
     int retval = 0;
     struct buffer *next = rc->txq.head->next;
+    int *n_queued = rc->txq.head->private;
     retval = vconn_send(rc->vconn, rc->txq.head);
     if (retval) {
         if (retval != EAGAIN) {
@@ -564,6 +564,9 @@ try_send(struct rconn *rc)
         return retval;
     }
     rc->packets_sent++;
+    if (n_queued) {
+        --*n_queued;
+    }
     queue_advance_head(&rc->txq, next);
     return 0;
 }
@@ -590,7 +593,7 @@ disconnect(struct rconn *rc, int error)
             }
             vconn_close(rc->vconn);
             rc->vconn = NULL;
-            queue_clear(&rc->txq);
+            flush_queue(rc);
         }
 
         if (now >= rc->backoff_deadline) {
@@ -610,6 +613,21 @@ disconnect(struct rconn *rc, int error)
     }
 }
 
+/* Drops all the packets from 'rc''s send queue and decrements their queue
+ * counts. */
+static void
+flush_queue(struct rconn *rc)
+{
+    while (rc->txq.n > 0) {
+        struct buffer *b = queue_pop_head(&rc->txq);
+        int *n_queued = b->private;
+        if (n_queued) {
+            --*n_queued;
+        }
+        buffer_delete(b);
+    }
+}
+
 static unsigned int
 elapsed_in_this_state(const struct rconn *rc)
 {
index e4fc62f..e4f283d 100644 (file)
@@ -103,6 +103,7 @@ struct settings {
 struct half {
     struct rconn *rconn;
     struct buffer *rxbuf;
+    int n_txq;                  /* No. of packets queued for tx on 'rconn'. */
 };
 
 struct relay {
@@ -195,11 +196,11 @@ main(int argc, char *argv[])
     daemonize();
 
     /* Connect to datapath. */
-    local_rconn = rconn_create(1, 0, s.max_backoff);
+    local_rconn = rconn_create(0, s.max_backoff);
     rconn_connect(local_rconn, s.nl_name);
 
     /* Connect to controller. */
-    remote_rconn = rconn_create(1, s.probe_interval, s.max_backoff);
+    remote_rconn = rconn_create(s.probe_interval, s.max_backoff);
     if (s.controller_name) {
         retval = rconn_connect(remote_rconn, s.controller_name);
         if (retval == EAFNOSUPPORT) {
@@ -321,11 +322,11 @@ relay_accept(const struct settings *s, struct vconn *listen_vconn)
     }
 
     /* Create and return relay. */
-    r1 = rconn_create(1, 0, 0);
+    r1 = rconn_create(0, 0);
     rconn_connect_unreliably(r1, nl_name_without_subscription, new_local);
     free(nl_name_without_subscription);
 
-    r2 = rconn_create(1, 0, 0);
+    r2 = rconn_create(0, 0);
     rconn_connect_unreliably(r2, "passive", new_remote);
 
     return relay_create(r1, r2, true);
@@ -334,15 +335,9 @@ relay_accept(const struct settings *s, struct vconn *listen_vconn)
 static struct relay *
 relay_create(struct rconn *local, struct rconn *remote, bool is_mgmt_conn)
 {
-    struct relay *r;
-    int i;
-
-    r = xmalloc(sizeof *r);
+    struct relay *r = xcalloc(1, sizeof *r);
     r->halves[HALF_LOCAL].rconn = local;
     r->halves[HALF_REMOTE].rconn = remote;
-    for (i = 0; i < 2; i++) {
-        r->halves[i].rxbuf = NULL;
-    }
     r->is_mgmt_conn = is_mgmt_conn;
     return r;
 }
@@ -379,8 +374,9 @@ relay_run(struct relay *r, const struct hook hooks[], size_t n_hooks)
                 }
             }
 
-            if (this->rxbuf) {
-                int retval = rconn_send(peer->rconn, this->rxbuf);
+            if (this->rxbuf && !this->n_txq) {
+                int retval = rconn_send(peer->rconn, this->rxbuf,
+                                        &this->n_txq);
                 if (retval != EAGAIN) {
                     if (!retval) {
                         progress = true;
@@ -443,14 +439,13 @@ struct in_band_data {
     struct mac_learning *ml;
     struct netdev *of_device;
     uint8_t mac[ETH_ADDR_LEN];
+    int n_queued;
 };
 
 static void
-queue_tx(struct rconn *rc, struct buffer *b)
+queue_tx(struct rconn *rc, struct in_band_data *in_band, struct buffer *b)
 {
-    if (rconn_force_send(rc, b)) {
-        buffer_delete(b);
-    }
+    rconn_send_with_limit(rc, b, &in_band->n_queued, 10);
 }
 
 static const uint8_t *
@@ -567,8 +562,8 @@ in_band_packet_cb(struct relay *r, int half, void *in_band_)
                && in_port == mac_learning_lookup(in_band->ml,
                                                  controller_mac)) {
         /* Drop controller traffic that arrives on the controller port. */
-        queue_tx(rc, make_add_flow(&flow, ntohl(opi->buffer_id),
-                                   in_band->s->max_idle, 0));
+        queue_tx(rc, in_band, make_add_flow(&flow, ntohl(opi->buffer_id),
+                                            in_band->s->max_idle, 0));
         return true;
     } else {
         return false;
@@ -576,12 +571,14 @@ in_band_packet_cb(struct relay *r, int half, void *in_band_)
 
     if (out_port != OFPP_FLOOD) {
         /* The output port is known, so add a new flow. */
-        queue_tx(rc, make_add_simple_flow(&flow, ntohl(opi->buffer_id),
-                                          out_port, in_band->s->max_idle));
+        queue_tx(rc, in_band,
+                 make_add_simple_flow(&flow, ntohl(opi->buffer_id),
+                                      out_port, in_band->s->max_idle));
 
         /* If the switch didn't buffer the packet, we need to send a copy. */
         if (ntohl(opi->buffer_id) == UINT32_MAX) {
-            queue_tx(rc, make_unbuffered_packet_out(&pkt, in_port, out_port));
+            queue_tx(rc, in_band,
+                     make_unbuffered_packet_out(&pkt, in_port, out_port));
         }
     } else {
         /* We don't know that MAC.  Send along the packet without setting up a
@@ -593,7 +590,7 @@ in_band_packet_cb(struct relay *r, int half, void *in_band_)
             b = make_buffered_packet_out(ntohl(opi->buffer_id),
                                          in_port, out_port);
         }
-        queue_tx(rc, b);
+        queue_tx(rc, in_band, b);
     }
     return true;
 }
@@ -604,7 +601,7 @@ in_band_hook_create(const struct settings *s)
     struct in_band_data *in_band;
     int retval;
 
-    in_band = xmalloc(sizeof *in_band);
+    in_band = xcalloc(1, sizeof *in_band);
     in_band->s = s;
     in_band->ml = mac_learning_create();
     retval = netdev_open(s->of_name, NETDEV_ETH_TYPE_NONE,
index 31b9af2..1d10ed4 100644 (file)
@@ -90,6 +90,8 @@ struct sender {
 struct remote {
     struct list node;
     struct rconn *rconn;
+#define TXQ_LIMIT 128           /* Max number of packets to queue for tx. */
+    int n_txq;                  /* Number of packets queued for tx on rconn. */
 
     /* Support for reliable, multi-message replies to requests.
      *
@@ -331,7 +333,7 @@ dp_run(struct datapath *dp)
                 }
                 break;
             }
-            remote_create(dp, rconn_new_from_vconn("passive", 128, new_vconn));
+            remote_create(dp, rconn_new_from_vconn("passive", new_vconn));
         }
     }
 }
@@ -367,7 +369,7 @@ remote_run(struct datapath *dp, struct remote *r)
             }
             buffer_delete(buffer); 
         } else {
-            if (!rconn_is_full(r->rconn)) {
+            if (r->n_txq < TXQ_LIMIT) {
                 int error = r->cb_dump(dp, r->cb_aux);
                 if (error <= 0) {
                     if (error) {
@@ -576,7 +578,9 @@ send_openflow_buffer(struct datapath *dp, struct buffer *buffer,
     int retval;
 
     update_openflow_length(buffer);
-    retval = rconn_send(rconn, buffer);
+    retval = (remote->n_txq < TXQ_LIMIT
+              ? rconn_send(rconn, buffer, &remote->n_txq)
+              : EAGAIN);
     if (retval) {
         VLOG_WARN("send to %s failed: %s",
                   rconn_get_name(rconn), strerror(retval));
index 8224089..3a02f64 100644 (file)
@@ -86,7 +86,7 @@ main(int argc, char *argv[])
         fatal(0, "missing controller argument; use --help for usage");
     }
 
-    rconn = rconn_create(128, 60, max_backoff);
+    rconn = rconn_create(60, max_backoff);
     error = rconn_connect(rconn, argv[optind]);
     if (error == EAFNOSUPPORT) {
         fatal(0, "no support for %s vconn", argv[optind]);