X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=lib%2Frconn.c;h=fabd61eea6bad7021d6220249dfe422cedd0a8ae;hb=84f7d34079fcc68bfa335adec9a4fbcb75d52bd4;hp=26e2e6146785b048a78493def2165e98ce805d2f;hpb=686e34c8b7406e3a6dabe78eeab52d51780ee120;p=sliver-openvswitch.git diff --git a/lib/rconn.c b/lib/rconn.c index 26e2e6146..fabd61eea 100644 --- a/lib/rconn.c +++ b/lib/rconn.c @@ -38,9 +38,9 @@ #include #include #include -#include "buffer.h" +#include "ofpbuf.h" +#include "openflow.h" #include "poll-loop.h" -#include "ofp-print.h" #include "sat-math.h" #include "timeval.h" #include "util.h" @@ -83,7 +83,7 @@ struct rconn { char *name; bool reliable; - struct queue txq; + struct ofp_queue txq; int backoff; int max_backoff; @@ -114,6 +114,11 @@ struct rconn { * an echo request as an inactivity probe packet. We should receive back * a response. */ int probe_interval; /* Secs of inactivity before sending probe. */ + + /* Messages sent or received are copied to the monitor connections. */ +#define MAX_MONITORS 8 + struct vconn *monitors[8]; + size_t n_monitors; }; static unsigned int elapsed_in_this_state(const struct rconn *); @@ -125,6 +130,7 @@ static int reconnect(struct rconn *); static void disconnect(struct rconn *, int error); static void flush_queue(struct rconn *); static void question_connectivity(struct rconn *); +static void copy_to_monitor(struct rconn *, const struct ofpbuf *); /* Creates a new rconn, connects it (reliably) to 'name', and returns it. */ struct rconn * @@ -189,6 +195,8 @@ rconn_create(int probe_interval, int max_backoff) rc->probe_interval = probe_interval ? MAX(5, probe_interval) : 0; + rc->n_monitors = 0; + return rc; } @@ -265,7 +273,7 @@ reconnect(struct rconn *rc) VLOG_WARN("%s: connecting...", rc->name); rc->n_attempted_connections++; - retval = vconn_open(rc->name, &rc->vconn); + retval = vconn_open(rc->name, OFP_VERSION, &rc->vconn); if (!retval) { rc->backoff_deadline = time_now() + rc->backoff; state_transition(rc, S_CONNECTING); @@ -303,13 +311,8 @@ run_CONNECTING(struct rconn *rc) if (!retval) { VLOG_WARN("%s: connected", rc->name); rc->n_successful_connections++; - if (vconn_is_passive(rc->vconn)) { - error(0, "%s: passive vconn not supported", rc->name); - state_transition(rc, S_VOID); - } else { - state_transition(rc, S_ACTIVE); - rc->last_connected = rc->state_entered; - } + state_transition(rc, S_ACTIVE); + rc->last_connected = rc->state_entered; } else if (retval != EAGAIN) { VLOG_WARN("%s: connection failed (%s)", rc->name, strerror(retval)); disconnect(rc, retval); @@ -421,14 +424,15 @@ rconn_run_wait(struct rconn *rc) /* Attempts to receive a packet from 'rc'. If successful, returns the packet; * otherwise, returns a null pointer. The caller is responsible for freeing - * the packet (with buffer_delete()). */ -struct buffer * + * the packet (with ofpbuf_delete()). */ +struct ofpbuf * rconn_recv(struct rconn *rc) { if (rc->state & (S_ACTIVE | S_IDLE)) { - struct buffer *buffer; + struct ofpbuf *buffer; int error = vconn_recv(rc->vconn, &buffer); if (!error) { + copy_to_monitor(rc, buffer); rc->last_received = time_now(); rc->packets_received++; if (rc->state == S_IDLE) { @@ -466,9 +470,10 @@ rconn_recv_wait(struct rconn *rc) * takes care of sending if you call rconn_run(), which will have the side * effect of waking up poll_block(). */ int -rconn_send(struct rconn *rc, struct buffer *b, int *n_queued) +rconn_send(struct rconn *rc, struct ofpbuf *b, int *n_queued) { if (rconn_is_connected(rc)) { + copy_to_monitor(rc, b); b->private = n_queued; if (n_queued) { ++*n_queued; @@ -486,7 +491,7 @@ rconn_send(struct rconn *rc, struct buffer *b, int *n_queued) /* Sends 'b' on 'rc'. Increments '*n_queued' while the packet is in flight; it * will be decremented when it has been sent (or discarded due to * disconnection). Returns 0 if successful, EAGAIN if '*n_queued' is already - * at least as large of 'queue_limit', or ENOTCONN if 'rc' is not currently + * at least as large as 'queue_limit', or ENOTCONN if 'rc' is not currently * connected. Regardless of return value, 'b' is destroyed. * * Because 'b' may be sent (or discarded) before this function returns, the @@ -496,13 +501,13 @@ rconn_send(struct rconn *rc, struct buffer *b, int *n_queued) * takes care of sending if you call rconn_run(), which will have the side * effect of waking up poll_block(). */ int -rconn_send_with_limit(struct rconn *rc, struct buffer *b, +rconn_send_with_limit(struct rconn *rc, struct ofpbuf *b, int *n_queued, int queue_limit) { int retval; retval = *n_queued >= queue_limit ? EAGAIN : rconn_send(rc, b, n_queued); if (retval) { - buffer_delete(b); + ofpbuf_delete(b); } return retval; } @@ -516,6 +521,21 @@ rconn_packets_sent(const struct rconn *rc) return rc->packets_sent; } +/* Adds 'vconn' to 'rc' as a monitoring connection, to which all messages sent + * and received on 'rconn' will be copied. 'rc' takes ownership of 'vconn'. */ +void +rconn_add_monitor(struct rconn *rc, struct vconn *vconn) +{ + if (rc->n_monitors < ARRAY_SIZE(rc->monitors)) { + VLOG_WARN("new monitor connection from %s", vconn_get_name(vconn)); + rc->monitors[rc->n_monitors++] = vconn; + } else { + VLOG_DBG("too many monitor connections, discarding %s", + vconn_get_name(vconn)); + vconn_close(vconn); + } +} + /* Returns 'rc''s name (the 'name' argument passed to rconn_new()). */ const char * rconn_get_name(const struct rconn *rc) @@ -645,7 +665,7 @@ static int try_send(struct rconn *rc) { int retval = 0; - struct buffer *next = rc->txq.head->next; + struct ofpbuf *next = rc->txq.head->next; int *n_queued = rc->txq.head->private; retval = vconn_send(rc->vconn, rc->txq.head); if (retval) { @@ -713,12 +733,12 @@ flush_queue(struct rconn *rc) return; } while (rc->txq.n > 0) { - struct buffer *b = queue_pop_head(&rc->txq); + struct ofpbuf *b = queue_pop_head(&rc->txq); int *n_queued = b->private; if (n_queued) { --*n_queued; } - buffer_delete(b); + ofpbuf_delete(b); } poll_immediate_wake(); } @@ -767,3 +787,31 @@ question_connectivity(struct rconn *rc) rc->last_questioned = now; } } + +static void +copy_to_monitor(struct rconn *rc, const struct ofpbuf *b) +{ + struct ofpbuf *clone = NULL; + int retval; + size_t i; + + for (i = 0; i < rc->n_monitors; ) { + struct vconn *vconn = rc->monitors[i]; + + if (!clone) { + clone = ofpbuf_clone(b); + } + retval = vconn_send(vconn, clone); + if (!retval) { + clone = NULL; + } else if (retval != EAGAIN) { + VLOG_DBG("%s: closing monitor connection to %s: %s", + rconn_get_name(rc), vconn_get_name(vconn), + strerror(retval)); + rc->monitors[i] = rc->monitors[--rc->n_monitors]; + continue; + } + i++; + } + ofpbuf_delete(clone); +}