From 903d0940fba239b2e250e9ab5f4dfdf4ecaf4bc0 Mon Sep 17 00:00:00 2001 From: Ben Pfaff Date: Mon, 11 Aug 2008 16:24:24 -0700 Subject: [PATCH] Drop rconn's responsibility for limiting the tx queue. Now it helps clients track the number of in-flight messages, but lets the clients do the limiting themselves. This will come in handy for packet-in rate limiting (in an upcoming commit), in which we want to track in-flight packet-in messages separately from other in-flight messages. --- controller/controller.c | 2 +- include/buffer.h | 1 + include/queue.h | 1 + include/rconn.h | 14 +++-- lib/buffer.c | 1 + lib/learning-switch.c | 9 ++-- lib/queue.c | 11 ++++ lib/rconn.c | 112 +++++++++++++++++++++++----------------- secchan/secchan.c | 45 ++++++++-------- switch/datapath.c | 10 ++-- switch/switch.c | 2 +- 11 files changed, 120 insertions(+), 88 deletions(-) diff --git a/controller/controller.c b/controller/controller.c index 2176fa778..bc18748e4 100644 --- a/controller/controller.c +++ b/controller/controller.c @@ -197,7 +197,7 @@ main(int argc, char *argv[]) static void new_switch(struct switch_ *sw, struct vconn *vconn, const char *name) { - sw->rconn = rconn_new_from_vconn(name, 128, vconn); + sw->rconn = rconn_new_from_vconn(name, vconn); sw->lswitch = lswitch_create(sw->rconn, learn_macs, setup_flows ? max_idle : -1); } diff --git a/include/buffer.h b/include/buffer.h index e1718007b..086806945 100644 --- a/include/buffer.h +++ b/include/buffer.h @@ -51,6 +51,7 @@ struct buffer { void *l7; /* Application data. */ struct buffer *next; /* Next in a list of buffers. */ + void *private; /* Private pointer for use by owner. */ }; void buffer_use(struct buffer *, void *, size_t); diff --git a/include/queue.h b/include/queue.h index c2d0f22d1..113d5013a 100644 --- a/include/queue.h +++ b/include/queue.h @@ -46,5 +46,6 @@ void queue_destroy(struct queue *); void queue_clear(struct queue *); void queue_advance_head(struct queue *, struct buffer *next); void queue_push_tail(struct queue *, struct buffer *); +struct buffer *queue_pop_head(struct queue *); #endif /* queue.h */ diff --git a/include/rconn.h b/include/rconn.h index 228b8e894..26e4be720 100644 --- a/include/rconn.h +++ b/include/rconn.h @@ -53,12 +53,10 @@ struct vconn; -struct rconn *rconn_new(const char *name, int txq_limit, +struct rconn *rconn_new(const char *name, int inactivity_probe_interval, int max_backoff); -struct rconn *rconn_new_from_vconn(const char *name, int txq_limit, - struct vconn *); -struct rconn *rconn_create(int txq_limit, int inactivity_probe_interval, - int max_backoff); +struct rconn *rconn_new_from_vconn(const char *name, struct vconn *); +struct rconn *rconn_create(int inactivity_probe_interval, int max_backoff); int rconn_connect(struct rconn *, const char *name); void rconn_connect_unreliably(struct rconn *, const char *name, struct vconn *vconn); @@ -69,9 +67,9 @@ void rconn_run(struct rconn *); void rconn_run_wait(struct rconn *); struct buffer *rconn_recv(struct rconn *); void rconn_recv_wait(struct rconn *); -int rconn_send(struct rconn *, struct buffer *); -int rconn_force_send(struct rconn *, struct buffer *); -bool rconn_is_full(const struct rconn *); +int rconn_send(struct rconn *, struct buffer *, int *n_queued); +int rconn_send_with_limit(struct rconn *, struct buffer *, + int *n_queued, int queue_limit); unsigned int rconn_packets_sent(const struct rconn *); const char *rconn_get_name(const struct rconn *); diff --git a/lib/buffer.c b/lib/buffer.c index 3dfd41a29..47600e6ae 100644 --- a/lib/buffer.c +++ b/lib/buffer.c @@ -53,6 +53,7 @@ buffer_use(struct buffer *b, void *base, size_t allocated) b->size = 0; b->l2 = b->l3 = b->l4 = b->l7 = NULL; b->next = NULL; + b->private = NULL; } /* Initializes 'b' as a buffer with an initial capacity of 'size' bytes. */ diff --git a/lib/learning-switch.c b/lib/learning-switch.c index d41602d6b..27271fa2d 100644 --- a/lib/learning-switch.c +++ b/lib/learning-switch.c @@ -63,6 +63,9 @@ struct lswitch { uint64_t datapath_id; time_t last_features_request; struct mac_learning *ml; /* NULL to act as hub instead of switch. */ + + /* Number of outgoing queued packets on the rconn. */ + int n_queued; }; static void queue_tx(struct lswitch *, struct rconn *, struct buffer *); @@ -85,8 +88,7 @@ static void process_echo_request(struct lswitch *, struct rconn *, struct lswitch * lswitch_create(struct rconn *rconn, bool learn_macs, int max_idle) { - struct lswitch *sw = xmalloc(sizeof *sw); - memset(sw, 0, sizeof *sw); + struct lswitch *sw = xcalloc(1, sizeof *sw); sw->max_idle = max_idle; sw->datapath_id = 0; sw->last_features_request = time_now() - 1; @@ -182,7 +184,7 @@ send_features_request(struct lswitch *sw, struct rconn *rconn) static void queue_tx(struct lswitch *sw, struct rconn *rconn, struct buffer *b) { - int retval = rconn_send(rconn, b); + int retval = rconn_send_with_limit(rconn, b, &sw->n_queued, 10); if (retval) { if (retval == EAGAIN) { /* FIXME: ratelimit. */ @@ -193,7 +195,6 @@ queue_tx(struct lswitch *sw, struct rconn *rconn, struct buffer *b) /* FIXME: ratelimit. */ VLOG_WARN("%s: send: %s", rconn_get_name(rconn), strerror(retval)); } - buffer_delete(b); } } diff --git a/lib/queue.c b/lib/queue.c index 635e14426..f06c8e9a7 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -101,6 +101,17 @@ queue_push_tail(struct queue *q, struct buffer *b) check_queue(q); } +/* Removes the first buffer from 'q', which must not be empty, and returns + * it. The caller must free the buffer (with buffer_delete()) when it is no + * longer needed. */ +struct buffer * +queue_pop_head(struct queue *q) +{ + struct buffer *head = q->head; + queue_advance_head(q, head->next); + return head; +} + /* Checks the internal integrity of 'q'. For use in debugging. */ static void check_queue(struct queue *q) diff --git a/lib/rconn.c b/lib/rconn.c index 0093c3ab5..6bc371ad5 100644 --- a/lib/rconn.c +++ b/lib/rconn.c @@ -83,7 +83,6 @@ struct rconn { bool reliable; struct queue txq; - int txq_limit; int backoff; int max_backoff; @@ -119,31 +118,28 @@ static void state_transition(struct rconn *, enum state); static int try_send(struct rconn *); static int reconnect(struct rconn *); static void disconnect(struct rconn *, int error); +static void flush_queue(struct rconn *); static void question_connectivity(struct rconn *); /* Creates a new rconn, connects it (reliably) to 'name', and returns it. */ struct rconn * -rconn_new(const char *name, int txq_limit, int inactivity_probe_interval, - int max_backoff) +rconn_new(const char *name, int inactivity_probe_interval, int max_backoff) { - struct rconn *rc = rconn_create(txq_limit, inactivity_probe_interval, - max_backoff); + struct rconn *rc = rconn_create(inactivity_probe_interval, max_backoff); rconn_connect(rc, name); return rc; } /* Creates a new rconn, connects it (unreliably) to 'vconn', and returns it. */ struct rconn * -rconn_new_from_vconn(const char *name, int txq_limit, struct vconn *vconn) +rconn_new_from_vconn(const char *name, struct vconn *vconn) { - struct rconn *rc = rconn_create(txq_limit, 60, 0); + struct rconn *rc = rconn_create(60, 0); rconn_connect_unreliably(rc, name, vconn); return rc; } /* Creates and returns a new rconn. - * - * 'txq_limit' is the maximum length of the send queue, in packets. * * 'probe_interval' is a number of seconds. If the interval passes once * without an OpenFlow message being received from the peer, the rconn sends @@ -156,7 +152,7 @@ rconn_new_from_vconn(const char *name, int txq_limit, struct vconn *vconn) * failure until it reaches 'max_backoff'. If 0 is specified, the default of * 60 seconds is used. */ struct rconn * -rconn_create(int txq_limit, int probe_interval, int max_backoff) +rconn_create(int probe_interval, int max_backoff) { struct rconn *rc = xcalloc(1, sizeof *rc); @@ -168,8 +164,6 @@ rconn_create(int txq_limit, int probe_interval, int max_backoff) rc->reliable = false; queue_init(&rc->txq); - assert(txq_limit > 0); - rc->txq_limit = txq_limit; rc->backoff = 0; rc->max_backoff = max_backoff ? max_backoff : 60; @@ -235,6 +229,7 @@ rconn_destroy(struct rconn *rc) if (rc) { free(rc->name); vconn_close(rc->vconn); + flush_queue(rc); queue_destroy(&rc->txq); free(rc); } @@ -435,55 +430,59 @@ rconn_recv_wait(struct rconn *rc) } } -/* Sends 'b' on 'rc'. Returns 0 if successful, EAGAIN if at least 'txq_limit' - * packets are already queued, otherwise a positive errno value. */ +/* Sends 'b' on 'rc'. Returns 0 if successful (in which case 'b' is + * destroyed), or ENOTCONN if 'rc' is not currently connected (in which case + * the caller retains ownership of 'b'). + * + * If 'n_queued' is non-null, then '*n_queued' will be incremented while the + * packet is in flight, then decremented when it has been sent (or discarded + * due to disconnection). Because 'b' may be sent (or discarded) before this + * function returns, the caller may not be able to observe any change in + * '*n_queued'. + * + * There is no rconn_send_wait() function: an rconn has a send queue that it + * takes care of sending if you call rconn_run(), which will have the side + * effect of waking up poll_block(). */ int -do_send(struct rconn *rc, struct buffer *b, int txq_limit) +rconn_send(struct rconn *rc, struct buffer *b, int *n_queued) { if (rc->vconn) { - if (rc->txq.n < txq_limit) { - queue_push_tail(&rc->txq, b); - if (rc->txq.n == 1) { - try_send(rc); - } - return 0; - } else { - return EAGAIN; + b->private = n_queued; + if (n_queued) { + ++*n_queued; } + queue_push_tail(&rc->txq, b); + if (rc->txq.n == 1) { + try_send(rc); + } + return 0; } else { return ENOTCONN; } } -/* Sends 'b' on 'rc'. Returns 0 if successful, EAGAIN if the send queue is - * full, or ENOTCONN if 'rc' is not currently connected. +/* Sends 'b' on 'rc'. Increments '*n_queued' while the packet is in flight; it + * will be decremented when it has been sent (or discarded due to + * disconnection). Returns 0 if successful, EAGAIN if '*n_queued' is already + * at least as large of 'queue_limit', or ENOTCONN if 'rc' is not currently + * connected. Regardless of return value, 'b' is destroyed. + * + * Because 'b' may be sent (or discarded) before this function returns, the + * caller may not be able to observe any change in '*n_queued'. * * There is no rconn_send_wait() function: an rconn has a send queue that it * takes care of sending if you call rconn_run(), which will have the side * effect of waking up poll_block(). */ int -rconn_send(struct rconn *rc, struct buffer *b) -{ - return do_send(rc, b, rc->txq_limit); -} - -/* Sends 'b' on 'rc'. Returns 0 if successful, EAGAIN if the send queue is - * full, otherwise a positive errno value. - * - * Compared to rconn_send(), this function relaxes the queue limit, allowing - * more packets than usual to be queued. */ -int -rconn_force_send(struct rconn *rc, struct buffer *b) +rconn_send_with_limit(struct rconn *rc, struct buffer *b, + int *n_queued, int queue_limit) { - return do_send(rc, b, 2 * rc->txq_limit); -} - -/* Returns true if 'rc''s send buffer is full, - * false if it has room for at least one more packet. */ -bool -rconn_is_full(const struct rconn *rc) -{ - return rc->txq.n >= rc->txq_limit; + int retval; + retval = *n_queued >= queue_limit ? EAGAIN : rconn_send(rc, b, n_queued); + if (retval) { + buffer_delete(b); + } + return retval; } /* Returns the total number of packets successfully sent on the underlying @@ -556,6 +555,7 @@ try_send(struct rconn *rc) { int retval = 0; struct buffer *next = rc->txq.head->next; + int *n_queued = rc->txq.head->private; retval = vconn_send(rc->vconn, rc->txq.head); if (retval) { if (retval != EAGAIN) { @@ -564,6 +564,9 @@ try_send(struct rconn *rc) return retval; } rc->packets_sent++; + if (n_queued) { + --*n_queued; + } queue_advance_head(&rc->txq, next); return 0; } @@ -590,7 +593,7 @@ disconnect(struct rconn *rc, int error) } vconn_close(rc->vconn); rc->vconn = NULL; - queue_clear(&rc->txq); + flush_queue(rc); } if (now >= rc->backoff_deadline) { @@ -610,6 +613,21 @@ disconnect(struct rconn *rc, int error) } } +/* Drops all the packets from 'rc''s send queue and decrements their queue + * counts. */ +static void +flush_queue(struct rconn *rc) +{ + while (rc->txq.n > 0) { + struct buffer *b = queue_pop_head(&rc->txq); + int *n_queued = b->private; + if (n_queued) { + --*n_queued; + } + buffer_delete(b); + } +} + static unsigned int elapsed_in_this_state(const struct rconn *rc) { diff --git a/secchan/secchan.c b/secchan/secchan.c index e4fc62f66..e4f283d1a 100644 --- a/secchan/secchan.c +++ b/secchan/secchan.c @@ -103,6 +103,7 @@ struct settings { struct half { struct rconn *rconn; struct buffer *rxbuf; + int n_txq; /* No. of packets queued for tx on 'rconn'. */ }; struct relay { @@ -195,11 +196,11 @@ main(int argc, char *argv[]) daemonize(); /* Connect to datapath. */ - local_rconn = rconn_create(1, 0, s.max_backoff); + local_rconn = rconn_create(0, s.max_backoff); rconn_connect(local_rconn, s.nl_name); /* Connect to controller. */ - remote_rconn = rconn_create(1, s.probe_interval, s.max_backoff); + remote_rconn = rconn_create(s.probe_interval, s.max_backoff); if (s.controller_name) { retval = rconn_connect(remote_rconn, s.controller_name); if (retval == EAFNOSUPPORT) { @@ -321,11 +322,11 @@ relay_accept(const struct settings *s, struct vconn *listen_vconn) } /* Create and return relay. */ - r1 = rconn_create(1, 0, 0); + r1 = rconn_create(0, 0); rconn_connect_unreliably(r1, nl_name_without_subscription, new_local); free(nl_name_without_subscription); - r2 = rconn_create(1, 0, 0); + r2 = rconn_create(0, 0); rconn_connect_unreliably(r2, "passive", new_remote); return relay_create(r1, r2, true); @@ -334,15 +335,9 @@ relay_accept(const struct settings *s, struct vconn *listen_vconn) static struct relay * relay_create(struct rconn *local, struct rconn *remote, bool is_mgmt_conn) { - struct relay *r; - int i; - - r = xmalloc(sizeof *r); + struct relay *r = xcalloc(1, sizeof *r); r->halves[HALF_LOCAL].rconn = local; r->halves[HALF_REMOTE].rconn = remote; - for (i = 0; i < 2; i++) { - r->halves[i].rxbuf = NULL; - } r->is_mgmt_conn = is_mgmt_conn; return r; } @@ -379,8 +374,9 @@ relay_run(struct relay *r, const struct hook hooks[], size_t n_hooks) } } - if (this->rxbuf) { - int retval = rconn_send(peer->rconn, this->rxbuf); + if (this->rxbuf && !this->n_txq) { + int retval = rconn_send(peer->rconn, this->rxbuf, + &this->n_txq); if (retval != EAGAIN) { if (!retval) { progress = true; @@ -443,14 +439,13 @@ struct in_band_data { struct mac_learning *ml; struct netdev *of_device; uint8_t mac[ETH_ADDR_LEN]; + int n_queued; }; static void -queue_tx(struct rconn *rc, struct buffer *b) +queue_tx(struct rconn *rc, struct in_band_data *in_band, struct buffer *b) { - if (rconn_force_send(rc, b)) { - buffer_delete(b); - } + rconn_send_with_limit(rc, b, &in_band->n_queued, 10); } static const uint8_t * @@ -567,8 +562,8 @@ in_band_packet_cb(struct relay *r, int half, void *in_band_) && in_port == mac_learning_lookup(in_band->ml, controller_mac)) { /* Drop controller traffic that arrives on the controller port. */ - queue_tx(rc, make_add_flow(&flow, ntohl(opi->buffer_id), - in_band->s->max_idle, 0)); + queue_tx(rc, in_band, make_add_flow(&flow, ntohl(opi->buffer_id), + in_band->s->max_idle, 0)); return true; } else { return false; @@ -576,12 +571,14 @@ in_band_packet_cb(struct relay *r, int half, void *in_band_) if (out_port != OFPP_FLOOD) { /* The output port is known, so add a new flow. */ - queue_tx(rc, make_add_simple_flow(&flow, ntohl(opi->buffer_id), - out_port, in_band->s->max_idle)); + queue_tx(rc, in_band, + make_add_simple_flow(&flow, ntohl(opi->buffer_id), + out_port, in_band->s->max_idle)); /* If the switch didn't buffer the packet, we need to send a copy. */ if (ntohl(opi->buffer_id) == UINT32_MAX) { - queue_tx(rc, make_unbuffered_packet_out(&pkt, in_port, out_port)); + queue_tx(rc, in_band, + make_unbuffered_packet_out(&pkt, in_port, out_port)); } } else { /* We don't know that MAC. Send along the packet without setting up a @@ -593,7 +590,7 @@ in_band_packet_cb(struct relay *r, int half, void *in_band_) b = make_buffered_packet_out(ntohl(opi->buffer_id), in_port, out_port); } - queue_tx(rc, b); + queue_tx(rc, in_band, b); } return true; } @@ -604,7 +601,7 @@ in_band_hook_create(const struct settings *s) struct in_band_data *in_band; int retval; - in_band = xmalloc(sizeof *in_band); + in_band = xcalloc(1, sizeof *in_band); in_band->s = s; in_band->ml = mac_learning_create(); retval = netdev_open(s->of_name, NETDEV_ETH_TYPE_NONE, diff --git a/switch/datapath.c b/switch/datapath.c index 31b9af2f5..1d10ed421 100644 --- a/switch/datapath.c +++ b/switch/datapath.c @@ -90,6 +90,8 @@ struct sender { struct remote { struct list node; struct rconn *rconn; +#define TXQ_LIMIT 128 /* Max number of packets to queue for tx. */ + int n_txq; /* Number of packets queued for tx on rconn. */ /* Support for reliable, multi-message replies to requests. * @@ -331,7 +333,7 @@ dp_run(struct datapath *dp) } break; } - remote_create(dp, rconn_new_from_vconn("passive", 128, new_vconn)); + remote_create(dp, rconn_new_from_vconn("passive", new_vconn)); } } } @@ -367,7 +369,7 @@ remote_run(struct datapath *dp, struct remote *r) } buffer_delete(buffer); } else { - if (!rconn_is_full(r->rconn)) { + if (r->n_txq < TXQ_LIMIT) { int error = r->cb_dump(dp, r->cb_aux); if (error <= 0) { if (error) { @@ -576,7 +578,9 @@ send_openflow_buffer(struct datapath *dp, struct buffer *buffer, int retval; update_openflow_length(buffer); - retval = rconn_send(rconn, buffer); + retval = (remote->n_txq < TXQ_LIMIT + ? rconn_send(rconn, buffer, &remote->n_txq) + : EAGAIN); if (retval) { VLOG_WARN("send to %s failed: %s", rconn_get_name(rconn), strerror(retval)); diff --git a/switch/switch.c b/switch/switch.c index 8224089d3..3a02f6460 100644 --- a/switch/switch.c +++ b/switch/switch.c @@ -86,7 +86,7 @@ main(int argc, char *argv[]) fatal(0, "missing controller argument; use --help for usage"); } - rconn = rconn_create(128, 60, max_backoff); + rconn = rconn_create(60, max_backoff); error = rconn_connect(rconn, argv[optind]); if (error == EAFNOSUPPORT) { fatal(0, "no support for %s vconn", argv[optind]); -- 2.43.0